From 8bcdd4b64513ef41750742eb2fe05ea8b0d49ccf Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 29 Apr 2024 10:21:42 +0100 Subject: [PATCH] Rename storage-rocksdb to partition-store --- Cargo.lock | 91 ++++++------ Cargo.toml | 2 +- .../Cargo.toml | 2 +- .../benches/basic_benchmark.rs | 2 +- .../src/deduplication_table/mod.rs | 0 .../src/fsm_table/mod.rs | 0 .../src/idempotency_table/mod.rs | 0 .../src/inbox_table/mod.rs | 0 .../src/invocation_status_table/mod.rs | 0 .../src/journal_table/mod.rs | 0 .../src/keys.rs | 2 +- .../src/lib.rs | 0 .../src/outbox_table/mod.rs | 0 .../src/owned_iter.rs | 0 .../src/partition_store.rs | 0 .../src/partition_store_manager.rs | 0 .../src/scan.rs | 0 crates/partition-store/src/scoped_db.rs | 138 ++++++++++++++++++ .../src/service_status_table/mod.rs | 0 .../src/state_table/mod.rs | 0 .../src/timer_table/mod.rs | 0 .../src/writer.rs | 0 .../tests/idempotency_table_test/mod.rs | 0 .../tests/inbox_table_test/mod.rs | 2 +- .../tests/integration_test.rs | 2 +- .../tests/invocation_status_table_test/mod.rs | 2 +- .../tests/journal_table_test/mod.rs | 2 +- .../tests/outbox_table_test/mod.rs | 2 +- .../tests/state_table_test/mod.rs | 2 +- .../tests/timer_table_test/mod.rs | 2 +- .../virtual_object_status_table_test/mod.rs | 2 +- crates/storage-query-datafusion/Cargo.toml | 6 +- .../storage-query-datafusion/src/context.rs | 2 +- .../src/inbox/table.rs | 2 +- .../src/invocation_status/row.rs | 2 +- .../src/invocation_status/table.rs | 4 +- .../src/journal/row.rs | 2 +- .../src/journal/table.rs | 4 +- .../src/keyed_service_status/row.rs | 2 +- .../src/keyed_service_status/table.rs | 4 +- crates/storage-query-datafusion/src/mocks.rs | 2 +- .../storage-query-datafusion/src/state/row.rs | 2 +- .../src/state/table.rs | 4 +- crates/storage-query-postgres/Cargo.toml | 2 +- crates/worker/Cargo.toml | 12 +- crates/worker/src/lib.rs | 4 +- crates/worker/src/partition/leadership/mod.rs | 2 +- crates/worker/src/partition/mod.rs | 2 +- .../worker/src/partition/state_machine/mod.rs | 2 +- .../worker/src/partition_processor_manager.rs | 2 +- server/Cargo.toml | 2 +- 51 files changed, 227 insertions(+), 90 deletions(-) rename crates/{storage-rocksdb => partition-store}/Cargo.toml (98%) rename crates/{storage-rocksdb => partition-store}/benches/basic_benchmark.rs (97%) rename crates/{storage-rocksdb => partition-store}/src/deduplication_table/mod.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/fsm_table/mod.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/idempotency_table/mod.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/inbox_table/mod.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/invocation_status_table/mod.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/journal_table/mod.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/keys.rs (99%) rename crates/{storage-rocksdb => partition-store}/src/lib.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/outbox_table/mod.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/owned_iter.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/partition_store.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/partition_store_manager.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/scan.rs (100%) create mode 100644 crates/partition-store/src/scoped_db.rs rename crates/{storage-rocksdb => partition-store}/src/service_status_table/mod.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/state_table/mod.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/timer_table/mod.rs (100%) rename crates/{storage-rocksdb => partition-store}/src/writer.rs (100%) rename crates/{storage-rocksdb => partition-store}/tests/idempotency_table_test/mod.rs (100%) rename crates/{storage-rocksdb => partition-store}/tests/inbox_table_test/mod.rs (98%) rename crates/{storage-rocksdb => partition-store}/tests/integration_test.rs (98%) rename crates/{storage-rocksdb => partition-store}/tests/invocation_status_table_test/mod.rs (99%) rename crates/{storage-rocksdb => partition-store}/tests/journal_table_test/mod.rs (99%) rename crates/{storage-rocksdb => partition-store}/tests/outbox_table_test/mod.rs (97%) rename crates/{storage-rocksdb => partition-store}/tests/state_table_test/mod.rs (98%) rename crates/{storage-rocksdb => partition-store}/tests/timer_table_test/mod.rs (98%) rename crates/{storage-rocksdb => partition-store}/tests/virtual_object_status_table_test/mod.rs (97%) diff --git a/Cargo.lock b/Cargo.lock index 9ff04a76f8..7373fe5ded 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5801,6 +5801,48 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "restate-partition-store" +version = "0.9.1" +dependencies = [ + "anyhow", + "bytes", + "bytestring", + "codederror", + "criterion", + "derive_builder", + "derive_more", + "drain", + "enum-map", + "futures", + "futures-util", + "googletest", + "num-bigint", + "once_cell", + "paste", + "prost", + "rand", + "restate-core", + "restate-errors", + "restate-rocksdb", + "restate-storage-api", + "restate-test-util", + "restate-types", + "rocksdb", + "schemars", + "serde", + "static_assertions", + "strum 0.26.2", + "strum_macros 0.26.2", + "sync_wrapper", + "tempfile", + "test-log", + "thiserror", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "restate-queue" version = "0.9.1" @@ -5922,7 +5964,6 @@ dependencies = [ "restate-fs-util", "restate-node", "restate-rocksdb", - "restate-storage-rocksdb", "restate-tracing-instrumentation", "restate-types", "restate-worker", @@ -6049,11 +6090,11 @@ dependencies = [ "prost", "restate-core", "restate-invoker-api", + "restate-partition-store", "restate-rocksdb", "restate-schema-api", "restate-service-protocol", "restate-storage-api", - "restate-storage-rocksdb", "restate-types", "schemars", "serde", @@ -6083,9 +6124,9 @@ dependencies = [ "prost", "restate-core", "restate-errors", + "restate-partition-store", "restate-storage-api", "restate-storage-query-datafusion", - "restate-storage-rocksdb", "restate-types", "schemars", "serde", @@ -6095,48 +6136,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "restate-storage-rocksdb" -version = "0.9.1" -dependencies = [ - "anyhow", - "bytes", - "bytestring", - "codederror", - "criterion", - "derive_builder", - "derive_more", - "drain", - "enum-map", - "futures", - "futures-util", - "googletest", - "num-bigint", - "once_cell", - "paste", - "prost", - "rand", - "restate-core", - "restate-errors", - "restate-rocksdb", - "restate-storage-api", - "restate-test-util", - "restate-types", - "rocksdb", - "schemars", - "serde", - "static_assertions", - "strum 0.26.2", - "strum_macros 0.26.2", - "sync_wrapper", - "tempfile", - "test-log", - "thiserror", - "tokio", - "tokio-stream", - "tracing", -] - [[package]] name = "restate-test-util" version = "0.9.1" @@ -6319,6 +6318,7 @@ dependencies = [ "restate-metadata-store", "restate-network", "restate-node-protocol", + "restate-partition-store", "restate-rocksdb", "restate-schema", "restate-schema-api", @@ -6328,7 +6328,6 @@ dependencies = [ "restate-storage-api", "restate-storage-query-datafusion", "restate-storage-query-postgres", - "restate-storage-rocksdb", "restate-test-util", "restate-timer", "restate-types", diff --git a/Cargo.toml b/Cargo.toml index f2fdbc7750..0adf3c1be7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ restate-network = { path = "crates/network" } restate-node = { path = "crates/node" } restate-node-protocol = { path = "crates/node-protocol" } restate-node-services = { path = "crates/node-services" } +restate-partition-store = { path = "crates/partition-store" } restate-queue = { path = "crates/queue" } restate-rocksdb = { path = "crates/rocksdb" } restate-schema = { path = "crates/schema" } @@ -60,7 +61,6 @@ restate-service-protocol = { path = "crates/service-protocol" } restate-storage-api = { path = "crates/storage-api" } restate-storage-query-datafusion = { path = "crates/storage-query-datafusion" } restate-storage-query-postgres = { path = "crates/storage-query-postgres" } -restate-storage-rocksdb = { path = "crates/storage-rocksdb" } restate-test-util = { path = "crates/test-util" } restate-timer = { path = "crates/timer" } restate-timer-queue = { path = "crates/timer-queue" } diff --git a/crates/storage-rocksdb/Cargo.toml b/crates/partition-store/Cargo.toml similarity index 98% rename from crates/storage-rocksdb/Cargo.toml rename to crates/partition-store/Cargo.toml index 8ad2e7dbfd..2f53349ccc 100644 --- a/crates/storage-rocksdb/Cargo.toml +++ b/crates/partition-store/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "restate-storage-rocksdb" +name = "restate-partition-store" version.workspace = true authors.workspace = true edition.workspace = true diff --git a/crates/storage-rocksdb/benches/basic_benchmark.rs b/crates/partition-store/benches/basic_benchmark.rs similarity index 97% rename from crates/storage-rocksdb/benches/basic_benchmark.rs rename to crates/partition-store/benches/basic_benchmark.rs index c55d20ec5d..9ac57aa2ad 100644 --- a/crates/storage-rocksdb/benches/basic_benchmark.rs +++ b/crates/partition-store/benches/basic_benchmark.rs @@ -12,12 +12,12 @@ use std::ops::RangeInclusive; use criterion::{criterion_group, criterion_main, Criterion}; use restate_core::TaskCenterBuilder; +use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_rocksdb::RocksDbManager; use restate_storage_api::deduplication_table::{ DedupSequenceNumber, DeduplicationTable, ProducerId, }; use restate_storage_api::Transaction; -use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_types::arc_util::Constant; use restate_types::config::{CommonOptions, WorkerOptions}; use restate_types::identifiers::PartitionKey; diff --git a/crates/storage-rocksdb/src/deduplication_table/mod.rs b/crates/partition-store/src/deduplication_table/mod.rs similarity index 100% rename from crates/storage-rocksdb/src/deduplication_table/mod.rs rename to crates/partition-store/src/deduplication_table/mod.rs diff --git a/crates/storage-rocksdb/src/fsm_table/mod.rs b/crates/partition-store/src/fsm_table/mod.rs similarity index 100% rename from crates/storage-rocksdb/src/fsm_table/mod.rs rename to crates/partition-store/src/fsm_table/mod.rs diff --git a/crates/storage-rocksdb/src/idempotency_table/mod.rs b/crates/partition-store/src/idempotency_table/mod.rs similarity index 100% rename from crates/storage-rocksdb/src/idempotency_table/mod.rs rename to crates/partition-store/src/idempotency_table/mod.rs diff --git a/crates/storage-rocksdb/src/inbox_table/mod.rs b/crates/partition-store/src/inbox_table/mod.rs similarity index 100% rename from crates/storage-rocksdb/src/inbox_table/mod.rs rename to crates/partition-store/src/inbox_table/mod.rs diff --git a/crates/storage-rocksdb/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs similarity index 100% rename from crates/storage-rocksdb/src/invocation_status_table/mod.rs rename to crates/partition-store/src/invocation_status_table/mod.rs diff --git a/crates/storage-rocksdb/src/journal_table/mod.rs b/crates/partition-store/src/journal_table/mod.rs similarity index 100% rename from crates/storage-rocksdb/src/journal_table/mod.rs rename to crates/partition-store/src/journal_table/mod.rs diff --git a/crates/storage-rocksdb/src/keys.rs b/crates/partition-store/src/keys.rs similarity index 99% rename from crates/storage-rocksdb/src/keys.rs rename to crates/partition-store/src/keys.rs index 021a3f9311..493105d6ce 100644 --- a/crates/storage-rocksdb/src/keys.rs +++ b/crates/partition-store/src/keys.rs @@ -143,7 +143,7 @@ pub trait TableKey: Sized + std::fmt::Debug + Send + 'static { /// This macro expands to: /// ```ignore /// use bytes::{Buf, BufMut, Bytes}; -/// use restate_storage_rocksdb::TableKind; +/// use restate_partition_store::TableKind; /// #[derive(Debug, Eq, PartialEq)] /// pub struct FooBarKey { /// pub foo: Option, diff --git a/crates/storage-rocksdb/src/lib.rs b/crates/partition-store/src/lib.rs similarity index 100% rename from crates/storage-rocksdb/src/lib.rs rename to crates/partition-store/src/lib.rs diff --git a/crates/storage-rocksdb/src/outbox_table/mod.rs b/crates/partition-store/src/outbox_table/mod.rs similarity index 100% rename from crates/storage-rocksdb/src/outbox_table/mod.rs rename to crates/partition-store/src/outbox_table/mod.rs diff --git a/crates/storage-rocksdb/src/owned_iter.rs b/crates/partition-store/src/owned_iter.rs similarity index 100% rename from crates/storage-rocksdb/src/owned_iter.rs rename to crates/partition-store/src/owned_iter.rs diff --git a/crates/storage-rocksdb/src/partition_store.rs b/crates/partition-store/src/partition_store.rs similarity index 100% rename from crates/storage-rocksdb/src/partition_store.rs rename to crates/partition-store/src/partition_store.rs diff --git a/crates/storage-rocksdb/src/partition_store_manager.rs b/crates/partition-store/src/partition_store_manager.rs similarity index 100% rename from crates/storage-rocksdb/src/partition_store_manager.rs rename to crates/partition-store/src/partition_store_manager.rs diff --git a/crates/storage-rocksdb/src/scan.rs b/crates/partition-store/src/scan.rs similarity index 100% rename from crates/storage-rocksdb/src/scan.rs rename to crates/partition-store/src/scan.rs diff --git a/crates/partition-store/src/scoped_db.rs b/crates/partition-store/src/scoped_db.rs new file mode 100644 index 0000000000..67f84e0291 --- /dev/null +++ b/crates/partition-store/src/scoped_db.rs @@ -0,0 +1,138 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use bytes::BytesMut; +use enum_map::{enum_map, EnumMap}; +use restate_rocksdb::{CfName, RocksDb}; +use restate_types::identifiers::PartitionId; +use rocksdb::{BoundColumnFamily, DBCompressionType}; + +use crate::TableKind; + +pub const PARTITION_DATA_CF_PREFIX: &str = "partition-data-"; +pub const PARTITION_KV_CF_PREFIX: &str = "partition-kv-"; + +pub struct PartitionScopedDb<'a> { + partition_id: PartitionId, + rocksdb: &'a Arc, + table_to_cf: EnumMap>>, + key_buffer: BytesMut, + value_buffer: BytesMut, +} + +impl<'a> Clone for PartitionScopedDb<'a> { + fn clone(&self) -> Self { + Self { + partition_id: self.partition_id.clone(), + rocksdb: self.rocksdb, + table_to_cf: self.table_to_cf.clone(), + key_buffer: BytesMut::default(), + value_buffer: BytesMut::default(), + } + } +} + +impl<'a> PartitionScopedDb<'a> { + pub fn new(partition_id: PartitionId, rocksdb: &'a Arc) -> Option { + let data_cf = rocksdb.cf_handle(&data_cf_for_partition(&partition_id))?; + let kv_cf = rocksdb.cf_handle(&kv_cf_for_partition(&partition_id))?; + + let table_to_cf = enum_map! { + // optimized for point lookups + TableKind::State => kv_cf.clone(), + TableKind::ServiceStatus => kv_cf.clone(), + TableKind::InvocationStatus => kv_cf.clone(), + TableKind::Idempotency => kv_cf.clone(), + TableKind::Deduplication => kv_cf.clone(), + TableKind::PartitionStateMachine => kv_cf.clone(), + // optimized for prefix scan lookups + TableKind::Journal => data_cf.clone(), + TableKind::Inbox => data_cf.clone(), + TableKind::Outbox => data_cf.clone(), + TableKind::Timers => data_cf.clone(), + }; + + Some(Self { + partition_id, + rocksdb, + table_to_cf, + key_buffer: BytesMut::default(), + value_buffer: BytesMut::default(), + }) + } + + pub(crate) fn table_handle(&self, table_kind: TableKind) -> &Arc { + &self.table_to_cf[table_kind] + } +} + +pub(crate) fn data_cf_for_partition(partition_id: &PartitionId) -> CfName { + CfName::from(format!("{}{}", PARTITION_DATA_CF_PREFIX, partition_id)) +} + +pub(crate) fn kv_cf_for_partition(partition_id: &PartitionId) -> CfName { + CfName::from(format!("{}{}", PARTITION_KV_CF_PREFIX, partition_id)) +} + +pub(crate) fn partition_ids_to_cfs(partition_ids: &[PartitionId]) -> Vec { + partition_ids + .iter() + .map(data_cf_for_partition) + .chain(partition_ids.iter().map(kv_cf_for_partition)) + .collect() +} + +pub(crate) fn data_cf_options(mut cf_options: rocksdb::Options) -> rocksdb::Options { + // Most of the changes are highly temporal, we try to delay flushing + // As much as we can to increase the chances to observe a deletion. + // + cf_options.set_max_write_buffer_number(3); + cf_options.set_min_write_buffer_number_to_merge(2); + // + // Set compactions per level + // + cf_options.set_num_levels(7); + cf_options.set_compression_per_level(&[ + DBCompressionType::None, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Zstd, + ]); + + cf_options +} + +pub(crate) fn kv_cf_options(mut cf_options: rocksdb::Options) -> rocksdb::Options { + // Most of the changes are highly temporal, we try to delay flushing + // As much as we can to increase the chances to observe a deletion. + // + cf_options.set_max_write_buffer_number(3); + cf_options.set_min_write_buffer_number_to_merge(2); + // + // Set compactions per level + // + cf_options.set_num_levels(7); + cf_options.set_compression_per_level(&[ + DBCompressionType::None, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Snappy, + DBCompressionType::Zstd, + ]); + + cf_options +} diff --git a/crates/storage-rocksdb/src/service_status_table/mod.rs b/crates/partition-store/src/service_status_table/mod.rs similarity index 100% rename from crates/storage-rocksdb/src/service_status_table/mod.rs rename to crates/partition-store/src/service_status_table/mod.rs diff --git a/crates/storage-rocksdb/src/state_table/mod.rs b/crates/partition-store/src/state_table/mod.rs similarity index 100% rename from crates/storage-rocksdb/src/state_table/mod.rs rename to crates/partition-store/src/state_table/mod.rs diff --git a/crates/storage-rocksdb/src/timer_table/mod.rs b/crates/partition-store/src/timer_table/mod.rs similarity index 100% rename from crates/storage-rocksdb/src/timer_table/mod.rs rename to crates/partition-store/src/timer_table/mod.rs diff --git a/crates/storage-rocksdb/src/writer.rs b/crates/partition-store/src/writer.rs similarity index 100% rename from crates/storage-rocksdb/src/writer.rs rename to crates/partition-store/src/writer.rs diff --git a/crates/storage-rocksdb/tests/idempotency_table_test/mod.rs b/crates/partition-store/tests/idempotency_table_test/mod.rs similarity index 100% rename from crates/storage-rocksdb/tests/idempotency_table_test/mod.rs rename to crates/partition-store/tests/idempotency_table_test/mod.rs diff --git a/crates/storage-rocksdb/tests/inbox_table_test/mod.rs b/crates/partition-store/tests/inbox_table_test/mod.rs similarity index 98% rename from crates/storage-rocksdb/tests/inbox_table_test/mod.rs rename to crates/partition-store/tests/inbox_table_test/mod.rs index cd748c5dd3..13beee13f7 100644 --- a/crates/storage-rocksdb/tests/inbox_table_test/mod.rs +++ b/crates/partition-store/tests/inbox_table_test/mod.rs @@ -10,9 +10,9 @@ use crate::{assert_stream_eq, mock_state_mutation}; use once_cell::sync::Lazy; +use restate_partition_store::PartitionStore; use restate_storage_api::inbox_table::{InboxEntry, InboxTable, SequenceNumberInboxEntry}; use restate_storage_api::Transaction; -use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::{InvocationId, ServiceId}; static INBOX_ENTRIES: Lazy> = Lazy::new(|| { diff --git a/crates/storage-rocksdb/tests/integration_test.rs b/crates/partition-store/tests/integration_test.rs similarity index 98% rename from crates/storage-rocksdb/tests/integration_test.rs rename to crates/partition-store/tests/integration_test.rs index db98574960..57d46353ba 100644 --- a/crates/storage-rocksdb/tests/integration_test.rs +++ b/crates/partition-store/tests/integration_test.rs @@ -11,9 +11,9 @@ use bytes::Bytes; use futures::Stream; use restate_core::TaskCenterBuilder; +use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_rocksdb::RocksDbManager; use restate_storage_api::StorageError; -use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_types::arc_util::Constant; use restate_types::config::{CommonOptions, WorkerOptions}; use restate_types::identifiers::{InvocationId, PartitionKey, ServiceId}; diff --git a/crates/storage-rocksdb/tests/invocation_status_table_test/mod.rs b/crates/partition-store/tests/invocation_status_table_test/mod.rs similarity index 99% rename from crates/storage-rocksdb/tests/invocation_status_table_test/mod.rs rename to crates/partition-store/tests/invocation_status_table_test/mod.rs index 216da0157d..73ec1df707 100644 --- a/crates/storage-rocksdb/tests/invocation_status_table_test/mod.rs +++ b/crates/partition-store/tests/invocation_status_table_test/mod.rs @@ -16,11 +16,11 @@ use super::assert_stream_eq; use bytestring::ByteString; use once_cell::sync::Lazy; +use restate_partition_store::PartitionStore; use restate_storage_api::invocation_status_table::{ InFlightInvocationMetadata, InvocationStatus, InvocationStatusTable, JournalMetadata, StatusTimestamps, }; -use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::InvocationId; use restate_types::invocation::{ HandlerType, InvocationTarget, ServiceInvocationSpanContext, Source, diff --git a/crates/storage-rocksdb/tests/journal_table_test/mod.rs b/crates/partition-store/tests/journal_table_test/mod.rs similarity index 99% rename from crates/storage-rocksdb/tests/journal_table_test/mod.rs rename to crates/partition-store/tests/journal_table_test/mod.rs index 7ffbcee576..10d67c78b0 100644 --- a/crates/storage-rocksdb/tests/journal_table_test/mod.rs +++ b/crates/partition-store/tests/journal_table_test/mod.rs @@ -12,9 +12,9 @@ use bytes::Bytes; use bytestring::ByteString; use futures_util::StreamExt; use once_cell::sync::Lazy; +use restate_partition_store::PartitionStore; use restate_storage_api::journal_table::{JournalEntry, JournalTable}; use restate_storage_api::Transaction; -use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::{InvocationId, InvocationUuid}; use restate_types::invocation::{InvocationTarget, ServiceInvocationSpanContext}; use restate_types::journal::enriched::{ diff --git a/crates/storage-rocksdb/tests/outbox_table_test/mod.rs b/crates/partition-store/tests/outbox_table_test/mod.rs similarity index 97% rename from crates/storage-rocksdb/tests/outbox_table_test/mod.rs rename to crates/partition-store/tests/outbox_table_test/mod.rs index 74b661fc26..ed013501d5 100644 --- a/crates/storage-rocksdb/tests/outbox_table_test/mod.rs +++ b/crates/partition-store/tests/outbox_table_test/mod.rs @@ -9,9 +9,9 @@ // by the Apache License, Version 2.0. use crate::mock_random_service_invocation; +use restate_partition_store::PartitionStore; use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable}; use restate_storage_api::Transaction; -use restate_storage_rocksdb::PartitionStore; fn mock_outbox_message() -> OutboxMessage { OutboxMessage::ServiceInvocation(mock_random_service_invocation()) diff --git a/crates/storage-rocksdb/tests/state_table_test/mod.rs b/crates/partition-store/tests/state_table_test/mod.rs similarity index 98% rename from crates/storage-rocksdb/tests/state_table_test/mod.rs rename to crates/partition-store/tests/state_table_test/mod.rs index cef7706780..9fad618c48 100644 --- a/crates/storage-rocksdb/tests/state_table_test/mod.rs +++ b/crates/partition-store/tests/state_table_test/mod.rs @@ -10,9 +10,9 @@ use crate::{assert_stream_eq, storage_test_environment}; use bytes::Bytes; +use restate_partition_store::PartitionStore; use restate_storage_api::state_table::{ReadOnlyStateTable, StateTable}; use restate_storage_api::Transaction; -use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::ServiceId; async fn populate_data(table: &mut T) { diff --git a/crates/storage-rocksdb/tests/timer_table_test/mod.rs b/crates/partition-store/tests/timer_table_test/mod.rs similarity index 98% rename from crates/storage-rocksdb/tests/timer_table_test/mod.rs rename to crates/partition-store/tests/timer_table_test/mod.rs index fb5de2cff4..c6093e2dec 100644 --- a/crates/storage-rocksdb/tests/timer_table_test/mod.rs +++ b/crates/partition-store/tests/timer_table_test/mod.rs @@ -10,9 +10,9 @@ use crate::mock_service_invocation; use futures_util::StreamExt; +use restate_partition_store::PartitionStore; use restate_storage_api::timer_table::{Timer, TimerKey, TimerTable}; use restate_storage_api::Transaction; -use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::{InvocationUuid, ServiceId}; use restate_types::invocation::ServiceInvocation; use std::pin::pin; diff --git a/crates/storage-rocksdb/tests/virtual_object_status_table_test/mod.rs b/crates/partition-store/tests/virtual_object_status_table_test/mod.rs similarity index 97% rename from crates/storage-rocksdb/tests/virtual_object_status_table_test/mod.rs rename to crates/partition-store/tests/virtual_object_status_table_test/mod.rs index 1e8299638a..684a432de0 100644 --- a/crates/storage-rocksdb/tests/virtual_object_status_table_test/mod.rs +++ b/crates/partition-store/tests/virtual_object_status_table_test/mod.rs @@ -8,8 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use restate_partition_store::PartitionStore; use restate_storage_api::service_status_table::{VirtualObjectStatus, VirtualObjectStatusTable}; -use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::{InvocationId, InvocationUuid, ServiceId}; const FIXTURE_INVOCATION: InvocationUuid = diff --git a/crates/storage-query-datafusion/Cargo.toml b/crates/storage-query-datafusion/Cargo.toml index f2a9e183ca..0652ded4c1 100644 --- a/crates/storage-query-datafusion/Cargo.toml +++ b/crates/storage-query-datafusion/Cargo.toml @@ -12,12 +12,12 @@ default = [] options_schema = ["dep:schemars"] [dependencies] -restate-storage-rocksdb = { workspace = true } -restate-types = { workspace = true } +restate-invoker-api = { workspace = true } +restate-partition-store = { workspace = true } restate-schema-api = { workspace = true, features = ["deployment"] } restate-service-protocol = { workspace = true, features = ["codec"] } restate-storage-api = { workspace = true } -restate-invoker-api = { workspace = true } +restate-types = { workspace = true } ahash = { workspace = true } # Required to due a yanked version used by datafusion async-trait = { workspace = true } diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index 837c63a2d2..09fec57b36 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -19,9 +19,9 @@ use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::prelude::{SessionConfig, SessionContext}; use restate_invoker_api::StatusHandle; +use restate_partition_store::PartitionStore; use restate_schema_api::deployment::DeploymentResolver; use restate_schema_api::service::ServiceMetadataResolver; -use restate_storage_rocksdb::PartitionStore; use restate_types::config::QueryEngineOptions; use crate::{analyzer, physical_optimizer}; diff --git a/crates/storage-query-datafusion/src/inbox/table.rs b/crates/storage-query-datafusion/src/inbox/table.rs index 727ddc2fc5..3b308abb1d 100644 --- a/crates/storage-query-datafusion/src/inbox/table.rs +++ b/crates/storage-query-datafusion/src/inbox/table.rs @@ -23,9 +23,9 @@ use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; pub use datafusion_expr::UserDefinedLogicalNode; use futures::{Stream, StreamExt}; +use restate_partition_store::PartitionStore; use restate_storage_api::inbox_table::{InboxTable, SequenceNumberInboxEntry}; use restate_storage_api::StorageError; -use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::PartitionKey; use tokio::sync::mpsc::Sender; diff --git a/crates/storage-query-datafusion/src/invocation_status/row.rs b/crates/storage-query-datafusion/src/invocation_status/row.rs index 54fcf66477..5d89f5b5ec 100644 --- a/crates/storage-query-datafusion/src/invocation_status/row.rs +++ b/crates/storage-query-datafusion/src/invocation_status/row.rs @@ -10,10 +10,10 @@ use crate::invocation_status::schema::{InvocationStatusBuilder, InvocationStatusRowBuilder}; use crate::table_util::format_using; +use restate_partition_store::invocation_status_table::OwnedInvocationStatusRow; use restate_storage_api::invocation_status_table::{ InFlightInvocationMetadata, InvocationStatus, JournalMetadata, StatusTimestamps, }; -use restate_storage_rocksdb::invocation_status_table::OwnedInvocationStatusRow; use restate_types::identifiers::InvocationId; use restate_types::invocation::{ServiceType, Source, TraceId}; diff --git a/crates/storage-query-datafusion/src/invocation_status/table.rs b/crates/storage-query-datafusion/src/invocation_status/table.rs index 08e4110c6c..aa3b06fdfc 100644 --- a/crates/storage-query-datafusion/src/invocation_status/table.rs +++ b/crates/storage-query-datafusion/src/invocation_status/table.rs @@ -22,8 +22,8 @@ use crate::invocation_status::schema::InvocationStatusBuilder; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; pub use datafusion_expr::UserDefinedLogicalNode; -use restate_storage_rocksdb::invocation_status_table::OwnedInvocationStatusRow; -use restate_storage_rocksdb::PartitionStore; +use restate_partition_store::invocation_status_table::OwnedInvocationStatusRow; +use restate_partition_store::PartitionStore; use restate_types::identifiers::PartitionKey; use tokio::sync::mpsc::Sender; diff --git a/crates/storage-query-datafusion/src/journal/row.rs b/crates/storage-query-datafusion/src/journal/row.rs index cf99d67b60..8e73bf9500 100644 --- a/crates/storage-query-datafusion/src/journal/row.rs +++ b/crates/storage-query-datafusion/src/journal/row.rs @@ -12,8 +12,8 @@ use crate::journal::schema::JournalBuilder; use restate_service_protocol::codec::ProtobufRawEntryCodec; +use restate_partition_store::journal_table::OwnedJournalRow; use restate_storage_api::journal_table::JournalEntry; -use restate_storage_rocksdb::journal_table::OwnedJournalRow; use restate_types::identifiers::WithPartitionKey; use restate_types::journal::enriched::{EnrichedEntryHeader, EnrichedRawEntry}; diff --git a/crates/storage-query-datafusion/src/journal/table.rs b/crates/storage-query-datafusion/src/journal/table.rs index 71d9217a43..d4ef38d510 100644 --- a/crates/storage-query-datafusion/src/journal/table.rs +++ b/crates/storage-query-datafusion/src/journal/table.rs @@ -22,8 +22,8 @@ use crate::journal::schema::JournalBuilder; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; pub use datafusion_expr::UserDefinedLogicalNode; -use restate_storage_rocksdb::journal_table::OwnedJournalRow; -use restate_storage_rocksdb::PartitionStore; +use restate_partition_store::journal_table::OwnedJournalRow; +use restate_partition_store::PartitionStore; use restate_types::identifiers::PartitionKey; use tokio::sync::mpsc::Sender; diff --git a/crates/storage-query-datafusion/src/keyed_service_status/row.rs b/crates/storage-query-datafusion/src/keyed_service_status/row.rs index a11c3cea2f..fedb6f67c0 100644 --- a/crates/storage-query-datafusion/src/keyed_service_status/row.rs +++ b/crates/storage-query-datafusion/src/keyed_service_status/row.rs @@ -10,8 +10,8 @@ use crate::keyed_service_status::schema::KeyedServiceStatusBuilder; use crate::table_util::format_using; +use restate_partition_store::service_status_table::OwnedVirtualObjectStatusRow; use restate_storage_api::service_status_table::VirtualObjectStatus; -use restate_storage_rocksdb::service_status_table::OwnedVirtualObjectStatusRow; #[inline] pub(crate) fn append_virtual_object_status_row( diff --git a/crates/storage-query-datafusion/src/keyed_service_status/table.rs b/crates/storage-query-datafusion/src/keyed_service_status/table.rs index 66b50d56b7..dc769c2f81 100644 --- a/crates/storage-query-datafusion/src/keyed_service_status/table.rs +++ b/crates/storage-query-datafusion/src/keyed_service_status/table.rs @@ -22,8 +22,8 @@ use crate::keyed_service_status::schema::KeyedServiceStatusBuilder; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; pub use datafusion_expr::UserDefinedLogicalNode; -use restate_storage_rocksdb::service_status_table::OwnedVirtualObjectStatusRow; -use restate_storage_rocksdb::PartitionStore; +use restate_partition_store::service_status_table::OwnedVirtualObjectStatusRow; +use restate_partition_store::PartitionStore; use restate_types::identifiers::PartitionKey; use tokio::sync::mpsc::Sender; diff --git a/crates/storage-query-datafusion/src/mocks.rs b/crates/storage-query-datafusion/src/mocks.rs index 319482971f..9bf4943474 100644 --- a/crates/storage-query-datafusion/src/mocks.rs +++ b/crates/storage-query-datafusion/src/mocks.rs @@ -17,12 +17,12 @@ use googletest::matcher::{Matcher, MatcherResult}; use restate_core::task_center; use restate_invoker_api::status_handle::mocks::MockStatusHandle; use restate_invoker_api::StatusHandle; +use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_rocksdb::RocksDbManager; use restate_schema_api::deployment::mocks::MockDeploymentMetadataRegistry; use restate_schema_api::deployment::{Deployment, DeploymentResolver}; use restate_schema_api::service::mocks::MockServiceMetadataResolver; use restate_schema_api::service::{ServiceMetadata, ServiceMetadataResolver}; -use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_types::arc_util::Constant; use restate_types::config::{CommonOptions, QueryEngineOptions, WorkerOptions}; use restate_types::identifiers::{DeploymentId, PartitionKey, ServiceRevision}; diff --git a/crates/storage-query-datafusion/src/state/row.rs b/crates/storage-query-datafusion/src/state/row.rs index 0c224a84f6..0c659f8718 100644 --- a/crates/storage-query-datafusion/src/state/row.rs +++ b/crates/storage-query-datafusion/src/state/row.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use crate::state::schema::StateBuilder; -use restate_storage_rocksdb::state_table::OwnedStateRow; +use restate_partition_store::state_table::OwnedStateRow; #[inline] pub(crate) fn append_state_row(builder: &mut StateBuilder, state_row: OwnedStateRow) { diff --git a/crates/storage-query-datafusion/src/state/table.rs b/crates/storage-query-datafusion/src/state/table.rs index 3f95156f25..4bf549d138 100644 --- a/crates/storage-query-datafusion/src/state/table.rs +++ b/crates/storage-query-datafusion/src/state/table.rs @@ -22,8 +22,8 @@ use crate::state::schema::StateBuilder; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; pub use datafusion_expr::UserDefinedLogicalNode; -use restate_storage_rocksdb::state_table::OwnedStateRow; -use restate_storage_rocksdb::PartitionStore; +use restate_partition_store::state_table::OwnedStateRow; +use restate_partition_store::PartitionStore; use restate_types::identifiers::PartitionKey; use tokio::sync::mpsc::Sender; diff --git a/crates/storage-query-postgres/Cargo.toml b/crates/storage-query-postgres/Cargo.toml index ca850947f6..4572df46d2 100644 --- a/crates/storage-query-postgres/Cargo.toml +++ b/crates/storage-query-postgres/Cargo.toml @@ -14,9 +14,9 @@ options_schema = ["dep:schemars"] [dependencies] restate-core = { workspace = true } restate-errors = { workspace = true } +restate-partition-store = {workspace = true } restate-storage-api = {workspace = true} restate-storage-query-datafusion = {workspace = true } -restate-storage-rocksdb = {workspace = true } restate-types = { workspace = true } diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 419a797ccc..1bdb15f775 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -11,13 +11,13 @@ publish = false default = [] options_schema = [ "dep:schemars", - "restate-timer/options_schema", - "restate-storage-rocksdb/options_schema", - "restate-storage-query-datafusion/options_schema", - "restate-storage-query-postgres/options_schema", - "restate-ingress-kafka/options_schema", "restate-ingress-http/options_schema", + "restate-ingress-kafka/options_schema", "restate-invoker-impl/options_schema", + "restate-partition-store/options_schema", + "restate-storage-query-datafusion/options_schema", + "restate-storage-query-postgres/options_schema", + "restate-timer/options_schema", ] [dependencies] @@ -32,6 +32,7 @@ restate-invoker-impl = { workspace = true } restate-metadata-store = { workspace = true } restate-network = { workspace = true } restate-node-protocol = { workspace = true } +restate-partition-store = { workspace = true } restate-rocksdb = { workspace = true } restate-schema = { workspace = true } restate-schema-api = { workspace = true, features = [ "service", "subscription"] } @@ -41,7 +42,6 @@ restate-service-protocol = { workspace = true, features = [ "codec", "awakeable- restate-storage-api = { workspace = true } restate-storage-query-datafusion = { workspace = true } restate-storage-query-postgres = { workspace = true } -restate-storage-rocksdb = { workspace = true } restate-timer = { workspace = true } restate-types = { workspace = true } restate-wal-protocol = { workspace = true } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index b71f387445..cd2e06f90c 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -38,11 +38,11 @@ use restate_invoker_impl::{ }; use restate_metadata_store::MetadataStoreClient; use restate_network::Networking; +use restate_partition_store::{PartitionStore, PartitionStoreManager}; use restate_schema::UpdateableSchema; use restate_service_protocol::codec::ProtobufRawEntryCodec; use restate_storage_query_datafusion::context::QueryContext; use restate_storage_query_postgres::service::PostgresQueryService; -use restate_storage_rocksdb::{PartitionStore, PartitionStoreManager}; use crate::invoker_integration::EntryEnricher; use crate::partition::storage::invoker::InvokerStorageReader; @@ -70,7 +70,7 @@ pub enum BuildError { RocksDB( #[from] #[code] - restate_storage_rocksdb::BuildError, + restate_partition_store::BuildError, ), #[error("failed opening partition store: {0}")] RocksDb( diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 5d94a6ebb9..b12e92606a 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -33,8 +33,8 @@ use crate::partition::state_machine::Action; pub(crate) use action_collector::{ActionEffect, ActionEffectStream}; use restate_bifrost::Bifrost; use restate_errors::NotRunningError; +use restate_partition_store::PartitionStore; use restate_storage_api::deduplication_table::EpochSequenceNumber; -use restate_storage_rocksdb::PartitionStore; use restate_types::identifiers::{InvocationId, PartitionKey}; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionLeaderEpoch}; use restate_wal_protocol::timer::TimerValue; diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index dd530e75ba..905a19c0b0 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -17,7 +17,7 @@ use futures::StreamExt; use metrics::counter; use restate_core::metadata; use restate_network::Networking; -use restate_storage_rocksdb::{PartitionStore, RocksDBTransaction}; +use restate_partition_store::{PartitionStore, RocksDBTransaction}; use restate_types::identifiers::{PartitionId, PartitionKey}; use std::fmt::Debug; use std::marker::PhantomData; diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index d5dd66394e..03e19a225d 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -94,6 +94,7 @@ mod tests { use googletest::{all, assert_that, pat, property}; use restate_core::{task_center, TaskCenterBuilder}; use restate_invoker_api::InvokeInputJournal; + use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_rocksdb::RocksDbManager; use restate_service_protocol::codec::ProtobufRawEntryCodec; use restate_storage_api::invocation_status_table::{ @@ -107,7 +108,6 @@ mod tests { }; use restate_storage_api::state_table::{ReadOnlyStateTable, StateTable}; use restate_storage_api::Transaction; - use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_test_util::matchers::*; use restate_types::arc_util::Constant; use restate_types::config::{CommonOptions, WorkerOptions}; diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 822ea10eb2..7d4d4db5e7 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -16,7 +16,7 @@ use restate_core::{metadata, task_center, ShutdownError, TaskId, TaskKind}; use restate_invoker_impl::InvokerHandle; use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError}; use restate_network::Networking; -use restate_storage_rocksdb::{OpenMode, PartitionStore, PartitionStoreManager}; +use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_types::arc_util::ArcSwapExt; use restate_types::config::{UpdateableConfiguration, WorkerOptions}; use restate_types::epoch::EpochMetadata; diff --git a/server/Cargo.toml b/server/Cargo.toml index 6b7acb3ea4..11812fcc67 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -32,7 +32,7 @@ restate-errors = { workspace = true, features = ["include_doc"] } restate-fs-util = { workspace = true } restate-node = { workspace = true } restate-rocksdb = { workspace = true } -restate-storage-rocksdb = { workspace = true } +#restate-partition-store = { workspace = true } restate-tracing-instrumentation = { workspace = true, features = ["rt-tokio"] } restate-types = { workspace = true, features = ["clap"] } restate-worker = { workspace = true }