Skip to content

Commit

Permalink
Rename MetadataStoreClientOptions into MetadataClientOptions
Browse files Browse the repository at this point in the history
This commit also moves some of the metadata client specific options back
under the [metadata-client] key. It tries to achieve backwards compatibility
with the previous configuration names. The metadata client used with the local
and the replicated metadata server is now called "native" ("embedded" still works
as an alias).
  • Loading branch information
tillrohrmann committed Feb 5, 2025
1 parent a2716d6 commit 8f5bc5c
Show file tree
Hide file tree
Showing 19 changed files with 189 additions and 153 deletions.
14 changes: 7 additions & 7 deletions crates/core/src/metadata_store/providers/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use etcd_client::{
TxnOp,
};

use restate_types::config::MetadataStoreClientOptions;
use restate_types::config::MetadataClientOptions;
use restate_types::errors::GenericError;

use crate::metadata_store::{
Expand Down Expand Up @@ -73,7 +73,7 @@ pub struct EtcdMetadataStore {
impl EtcdMetadataStore {
pub async fn new<S: AsRef<[A]>, A: AsRef<str>>(
addresses: S,
metadata_options: &MetadataStoreClientOptions,
metadata_options: &MetadataClientOptions,
) -> anyhow::Result<Self> {
let opts = ConnectOptions::new()
.with_connect_timeout(metadata_options.connect_timeout())
Expand Down Expand Up @@ -323,7 +323,7 @@ impl ProvisionedMetadataStore for EtcdMetadataStore {
mod test {
use bytes::Bytes;
use bytestring::ByteString;
use restate_types::{config::MetadataStoreClientOptions, Version};
use restate_types::{config::MetadataClientOptions, Version};

use super::EtcdMetadataStore;
use crate::metadata_store::{MetadataStore, Precondition, VersionedValue, WriteError};
Expand All @@ -337,7 +337,7 @@ mod test {
#[ignore]
#[tokio::test]
async fn test_put_does_not_exist() {
let opts = MetadataStoreClientOptions::default();
let opts = MetadataClientOptions::default();
let client = EtcdMetadataStore::new(&TEST_ADDRESS, &opts).await.unwrap();

let key: ByteString = "put_does_not_exist".into();
Expand All @@ -364,7 +364,7 @@ mod test {
#[ignore]
#[tokio::test]
async fn test_put_with_version() {
let opts = MetadataStoreClientOptions::default();
let opts = MetadataClientOptions::default();
let client = EtcdMetadataStore::new(&TEST_ADDRESS, &opts).await.unwrap();

let key: ByteString = "put_with_version".into();
Expand Down Expand Up @@ -422,7 +422,7 @@ mod test {
#[ignore]
#[tokio::test]
async fn test_put_force() {
let opts = MetadataStoreClientOptions::default();
let opts = MetadataClientOptions::default();
let client = EtcdMetadataStore::new(&TEST_ADDRESS, &opts).await.unwrap();

let key: ByteString = "put_force".into();
Expand Down Expand Up @@ -466,7 +466,7 @@ mod test {
#[ignore]
#[tokio::test]
async fn test_delete() {
let opts = MetadataStoreClientOptions::default();
let opts = MetadataClientOptions::default();
let client = EtcdMetadataStore::new(&TEST_ADDRESS, &opts).await.unwrap();

let key: ByteString = "put_delete_me".into();
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/metadata_store/providers/objstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLock
use crate::metadata_store::providers::objstore::version_repository::VersionRepository;
use crate::metadata_store::MetadataStore;
use crate::{TaskCenter, TaskKind};
use restate_types::config::MetadataStoreClient;
use restate_types::config::MetadataClientKind;
use restate_types::errors::GenericError;

mod glue;
Expand All @@ -22,7 +22,7 @@ mod optimistic_store;
mod version_repository;

pub async fn create_object_store_based_meta_store(
configuration: MetadataStoreClient,
configuration: MetadataClientKind,
) -> Result<impl MetadataStore, GenericError> {
// obtain an instance of a version repository from the configuration.
// we use an object_store backed version repository.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ use object_store::{Error, ObjectStore, PutMode, PutOptions, PutPayload, UpdateVe
use crate::metadata_store::providers::objstore::version_repository::{
Tag, TaggedValue, VersionRepository, VersionRepositoryError,
};
use restate_types::config::{MetadataStoreClient, ObjectStoreCredentials};
use restate_types::config::{MetadataClientKind, ObjectStoreCredentials};

#[derive(Debug)]
pub(crate) struct ObjectStoreVersionRepository {
object_store: Box<dyn ObjectStore>,
}

impl ObjectStoreVersionRepository {
pub(crate) fn from_configuration(configuration: MetadataStoreClient) -> anyhow::Result<Self> {
let MetadataStoreClient::ObjectStore {
pub(crate) fn from_configuration(configuration: MetadataClientKind) -> anyhow::Result<Self> {
let MetadataClientKind::ObjectStore {
credentials,
bucket,
..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ use crate::metadata_store::providers::objstore::version_repository::{
TaggedValue, VersionRepository, VersionRepositoryError,
};
use crate::metadata_store::{Precondition, ReadError, VersionedValue, WriteError};
use restate_types::config::MetadataStoreClient;
use restate_types::config::MetadataClientKind;
use restate_types::Version;

pub(crate) struct OptimisticLockingMetadataStoreBuilder {
pub(crate) version_repository: Box<dyn VersionRepository>,
pub(crate) configuration: MetadataStoreClient,
pub(crate) configuration: MetadataClientKind,
}

impl OptimisticLockingMetadataStoreBuilder {
pub(crate) async fn build(self) -> anyhow::Result<OptimisticLockingMetadataStore> {
let MetadataStoreClient::ObjectStore { .. } = self.configuration else {
let MetadataClientKind::ObjectStore { .. } = self.configuration else {
anyhow::bail!("unexpected configuration value");
};
Ok(OptimisticLockingMetadataStore::new(self.version_repository))
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/network/net_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio_util::net::Listener;
use tonic::transport::{Channel, Endpoint};
use tracing::{debug, error_span, info, instrument, trace, Instrument, Span};

use restate_types::config::{Configuration, MetadataStoreClientOptions, NetworkingOptions};
use restate_types::config::{Configuration, MetadataClientOptions, NetworkingOptions};
use restate_types::errors::GenericError;
use restate_types::net::{AdvertisedAddress, BindAddress};

Expand Down Expand Up @@ -292,21 +292,21 @@ impl CommonClientConnectionOptions for NetworkingOptions {
}
}

impl CommonClientConnectionOptions for MetadataStoreClientOptions {
impl CommonClientConnectionOptions for MetadataClientOptions {
fn connect_timeout(&self) -> Duration {
self.metadata_store_connect_timeout.into()
self.connect_timeout.into()
}

fn request_timeout(&self) -> Option<Duration> {
None
}

fn keep_alive_interval(&self) -> Duration {
self.metadata_store_keep_alive_interval.into()
self.keep_alive_interval.into()
}

fn keep_alive_timeout(&self) -> Duration {
self.metadata_store_keep_alive_timeout.into()
self.keep_alive_timeout.into()
}

fn http2_adaptive_window(&self) -> bool {
Expand Down
22 changes: 7 additions & 15 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use restate_types::partition_table::PartitionReplication;
use restate_types::protobuf::common::MetadataServerStatus;
use restate_types::retries::RetryPolicy;
use restate_types::{
config::{Configuration, MetadataStoreClient},
config::{Configuration, MetadataClientKind},
errors::GenericError,
metadata_store::keys::NODES_CONFIG_KEY,
net::{AdvertisedAddress, BindAddress},
Expand Down Expand Up @@ -132,12 +132,8 @@ impl Node {
&self.base_config.common.advertised_address
}

pub fn metadata_store_client_mut(&mut self) -> &mut MetadataStoreClient {
&mut self
.base_config
.common
.metadata_store_client
.metadata_store_client
pub fn metadata_store_client_mut(&mut self) -> &mut MetadataClientKind {
&mut self.base_config.common.metadata_client.kind
}

pub fn config(&self) -> &Configuration {
Expand Down Expand Up @@ -231,7 +227,7 @@ impl Node {

// update nodes with the addresses of the other nodes
for node in &mut nodes {
*node.metadata_store_client_mut() = MetadataStoreClient::Embedded {
*node.metadata_store_client_mut() = MetadataClientKind::Native {
addresses: node_addresses.clone(),
}
}
Expand All @@ -251,11 +247,8 @@ impl Node {
let base_dir = base_dir.into();

// ensure file paths are relative to the base dir
if let MetadataStoreClient::Embedded { addresses } = &mut self
.base_config
.common
.metadata_store_client
.metadata_store_client
if let MetadataClientKind::Native { addresses } =
&mut self.base_config.common.metadata_client.kind
{
for advertised_address in addresses {
if let AdvertisedAddress::Uds(file) = advertised_address {
Expand Down Expand Up @@ -721,8 +714,7 @@ impl StartedNode {
pub async fn metadata_client(
&self,
) -> Result<restate_metadata_server::MetadataStoreClient, GenericError> {
restate_metadata_server::create_client(self.config().common.metadata_store_client.clone())
.await
restate_metadata_server::create_client(self.config().common.metadata_client.clone()).await
}

/// Check to see if the admin address is healthy. Returns false if this node has no admin role.
Expand Down
8 changes: 4 additions & 4 deletions crates/metadata-server/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use restate_core::metadata_store::{
};
use restate_core::network::net_util::create_tonic_channel;
use restate_core::{cancellation_watcher, Metadata, TaskCenter, TaskKind};
use restate_types::config::{Configuration, MetadataStoreClientOptions};
use restate_types::config::{Configuration, MetadataClientOptions};
use restate_types::net::metadata::MetadataKind;
use restate_types::net::AdvertisedAddress;
use restate_types::nodes_config::{MetadataServerState, NodesConfiguration, Role};
Expand Down Expand Up @@ -70,7 +70,7 @@ pub struct GrpcMetadataServerClient {
impl GrpcMetadataServerClient {
pub fn new(
metadata_store_addresses: Vec<AdvertisedAddress>,
client_options: MetadataStoreClientOptions,
client_options: MetadataClientOptions,
) -> Self {
let channel_manager = ChannelManager::new(metadata_store_addresses, client_options);
let svc_client = Arc::new(Mutex::new(
Expand Down Expand Up @@ -362,13 +362,13 @@ impl StatusError {
#[derive(Clone, Debug)]
struct ChannelManager {
channels: Arc<Mutex<Channels>>,
client_options: MetadataStoreClientOptions,
client_options: MetadataClientOptions,
}

impl ChannelManager {
fn new(
initial_addresses: Vec<AdvertisedAddress>,
client_options: MetadataStoreClientOptions,
client_options: MetadataClientOptions,
) -> Self {
let initial_channels: Vec<_> = initial_addresses
.into_iter()
Expand Down
21 changes: 8 additions & 13 deletions crates/metadata-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ pub use restate_core::metadata_store::{
use restate_core::network::NetworkServerBuilder;
use restate_core::{MetadataWriter, ShutdownError};
use restate_types::config::{
Configuration, MetadataServerKind, MetadataServerOptions, MetadataStoreClientOptions,
RocksDbOptions,
Configuration, MetadataClientOptions, MetadataServerKind, MetadataServerOptions, RocksDbOptions,
};
use restate_types::errors::GenericError;
use restate_types::health::HealthStatus;
Expand Down Expand Up @@ -613,25 +612,21 @@ impl Default for MetadataServerConfiguration {

/// Creates a [`MetadataStoreClient`] for configured metadata store.
pub async fn create_client(
metadata_store_client_options: MetadataStoreClientOptions,
metadata_store_client_options: MetadataClientOptions,
) -> Result<MetadataStoreClient, GenericError> {
let backoff_policy = Some(
metadata_store_client_options
.metadata_store_client_backoff_policy
.clone(),
);

let client = match metadata_store_client_options.metadata_store_client.clone() {
config::MetadataStoreClient::Embedded { addresses } => {
let backoff_policy = Some(metadata_store_client_options.backoff_policy.clone());

let client = match metadata_store_client_options.kind.clone() {
config::MetadataClientKind::Native { addresses } => {
let inner_client =
GrpcMetadataServerClient::new(addresses, metadata_store_client_options);
MetadataStoreClient::new(inner_client, backoff_policy)
}
config::MetadataStoreClient::Etcd { addresses } => {
config::MetadataClientKind::Etcd { addresses } => {
let store = EtcdMetadataStore::new(addresses, &metadata_store_client_options).await?;
MetadataStoreClient::new(store, backoff_policy)
}
conf @ config::MetadataStoreClient::ObjectStore { .. } => {
conf @ config::MetadataClientKind::ObjectStore { .. } => {
let store = create_object_store_based_meta_store(conf).await?;
MetadataStoreClient::new(store, backoff_policy)
}
Expand Down
18 changes: 9 additions & 9 deletions crates/metadata-server/src/local/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use restate_core::network::{FailingConnector, NetworkServerBuilder};
use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder};
use restate_rocksdb::RocksDbManager;
use restate_types::config::{
self, reset_base_temp_dir_and_retain, Configuration, MetadataServerOptions,
MetadataStoreClientOptions, RocksDbOptions,
self, reset_base_temp_dir_and_retain, Configuration, MetadataClientOptions,
MetadataServerOptions, RocksDbOptions,
};
use restate_types::health::HealthStatus;
use restate_types::live::{BoxedLiveLoad, Live};
Expand Down Expand Up @@ -209,7 +209,7 @@ async fn durable_storage() -> anyhow::Result<()> {
// reset RocksDbManager to allow restarting the metadata store
RocksDbManager::get().reset().await?;

let metadata_store_client_opts = MetadataStoreClientOptions::default();
let metadata_store_client_opts = MetadataClientOptions::default();
let metadata_store_opts = opts.clone();
let metadata_store_opts = Live::from_value(metadata_store_opts);
let client = start_metadata_server(
Expand Down Expand Up @@ -258,7 +258,7 @@ async fn create_test_environment(
RocksDbManager::init(config.clone().map(|c| &c.common));

let client = start_metadata_server(
config.pinned().common.metadata_store_client.clone(),
config.pinned().common.metadata_client.clone(),
&config.pinned().metadata_server,
config.clone().map(|c| &c.metadata_server.rocksdb).boxed(),
)
Expand All @@ -268,7 +268,7 @@ async fn create_test_environment(
}

async fn start_metadata_server(
mut metadata_store_client_options: MetadataStoreClientOptions,
mut metadata_store_client_options: MetadataClientOptions,
opts: &MetadataServerOptions,
updateables_rocksdb_options: BoxedLiveLoad<RocksDbOptions>,
) -> anyhow::Result<MetadataStoreClient> {
Expand All @@ -283,7 +283,7 @@ async fn start_metadata_server(

let uds = tempfile::tempdir()?.into_path().join("metadata-rpc-server");
let bind_address = BindAddress::Uds(uds.clone());
metadata_store_client_options.metadata_store_client = config::MetadataStoreClient::Embedded {
metadata_store_client_options.kind = config::MetadataClientKind::Native {
addresses: vec![AdvertisedAddress::Uds(uds)],
};

Expand All @@ -307,8 +307,8 @@ async fn start_metadata_server(
)?;

assert2::let_assert!(
config::MetadataStoreClient::Embedded { addresses } =
metadata_store_client_options.metadata_store_client.clone()
config::MetadataClientKind::Native { addresses } =
metadata_store_client_options.kind.clone()
);

rpc_server_health_status
Expand All @@ -319,7 +319,7 @@ async fn start_metadata_server(
GrpcMetadataServerClient::new(addresses, metadata_store_client_options.clone());
let client = MetadataStoreClient::new(
grpc_client,
Some(metadata_store_client_options.metadata_store_client_backoff_policy),
Some(metadata_store_client_options.backoff_policy),
);

Ok(client)
Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl Node {
cluster_marker::validate_and_update_cluster_marker(config.common.cluster_name())?;

let metadata_store_client =
restate_metadata_server::create_client(config.common.metadata_store_client.clone())
restate_metadata_server::create_client(config.common.metadata_client.clone())
.await
.map_err(BuildError::MetadataStoreClient)?;
let metadata_manager =
Expand Down
Loading

0 comments on commit 8f5bc5c

Please sign in to comment.