diff --git a/crates/core/src/metadata_store/providers/etcd.rs b/crates/core/src/metadata_store/providers/etcd.rs index e8f5b0ddee..257a8e6460 100644 --- a/crates/core/src/metadata_store/providers/etcd.rs +++ b/crates/core/src/metadata_store/providers/etcd.rs @@ -16,7 +16,7 @@ use etcd_client::{ TxnOp, }; -use restate_types::config::MetadataStoreClientOptions; +use restate_types::config::MetadataClientOptions; use restate_types::errors::GenericError; use crate::metadata_store::{ @@ -73,7 +73,7 @@ pub struct EtcdMetadataStore { impl EtcdMetadataStore { pub async fn new, A: AsRef>( addresses: S, - metadata_options: &MetadataStoreClientOptions, + metadata_options: &MetadataClientOptions, ) -> anyhow::Result { let opts = ConnectOptions::new() .with_connect_timeout(metadata_options.connect_timeout()) @@ -323,7 +323,7 @@ impl ProvisionedMetadataStore for EtcdMetadataStore { mod test { use bytes::Bytes; use bytestring::ByteString; - use restate_types::{config::MetadataStoreClientOptions, Version}; + use restate_types::{config::MetadataClientOptions, Version}; use super::EtcdMetadataStore; use crate::metadata_store::{MetadataStore, Precondition, VersionedValue, WriteError}; @@ -337,7 +337,7 @@ mod test { #[ignore] #[tokio::test] async fn test_put_does_not_exist() { - let opts = MetadataStoreClientOptions::default(); + let opts = MetadataClientOptions::default(); let client = EtcdMetadataStore::new(&TEST_ADDRESS, &opts).await.unwrap(); let key: ByteString = "put_does_not_exist".into(); @@ -364,7 +364,7 @@ mod test { #[ignore] #[tokio::test] async fn test_put_with_version() { - let opts = MetadataStoreClientOptions::default(); + let opts = MetadataClientOptions::default(); let client = EtcdMetadataStore::new(&TEST_ADDRESS, &opts).await.unwrap(); let key: ByteString = "put_with_version".into(); @@ -422,7 +422,7 @@ mod test { #[ignore] #[tokio::test] async fn test_put_force() { - let opts = MetadataStoreClientOptions::default(); + let opts = MetadataClientOptions::default(); let client = EtcdMetadataStore::new(&TEST_ADDRESS, &opts).await.unwrap(); let key: ByteString = "put_force".into(); @@ -466,7 +466,7 @@ mod test { #[ignore] #[tokio::test] async fn test_delete() { - let opts = MetadataStoreClientOptions::default(); + let opts = MetadataClientOptions::default(); let client = EtcdMetadataStore::new(&TEST_ADDRESS, &opts).await.unwrap(); let key: ByteString = "put_delete_me".into(); diff --git a/crates/core/src/metadata_store/providers/objstore/mod.rs b/crates/core/src/metadata_store/providers/objstore/mod.rs index 3d81fc0db9..73abde3a23 100644 --- a/crates/core/src/metadata_store/providers/objstore/mod.rs +++ b/crates/core/src/metadata_store/providers/objstore/mod.rs @@ -13,7 +13,7 @@ use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLock use crate::metadata_store::providers::objstore::version_repository::VersionRepository; use crate::metadata_store::MetadataStore; use crate::{TaskCenter, TaskKind}; -use restate_types::config::MetadataStoreClient; +use restate_types::config::MetadataClientKind; use restate_types::errors::GenericError; mod glue; @@ -22,7 +22,7 @@ mod optimistic_store; mod version_repository; pub async fn create_object_store_based_meta_store( - configuration: MetadataStoreClient, + configuration: MetadataClientKind, ) -> Result { // obtain an instance of a version repository from the configuration. // we use an object_store backed version repository. diff --git a/crates/core/src/metadata_store/providers/objstore/object_store_version_repository.rs b/crates/core/src/metadata_store/providers/objstore/object_store_version_repository.rs index a6528cd68e..6fbe85bea4 100644 --- a/crates/core/src/metadata_store/providers/objstore/object_store_version_repository.rs +++ b/crates/core/src/metadata_store/providers/objstore/object_store_version_repository.rs @@ -18,7 +18,7 @@ use object_store::{Error, ObjectStore, PutMode, PutOptions, PutPayload, UpdateVe use crate::metadata_store::providers::objstore::version_repository::{ Tag, TaggedValue, VersionRepository, VersionRepositoryError, }; -use restate_types::config::{MetadataStoreClient, ObjectStoreCredentials}; +use restate_types::config::{MetadataClientKind, ObjectStoreCredentials}; #[derive(Debug)] pub(crate) struct ObjectStoreVersionRepository { @@ -26,8 +26,8 @@ pub(crate) struct ObjectStoreVersionRepository { } impl ObjectStoreVersionRepository { - pub(crate) fn from_configuration(configuration: MetadataStoreClient) -> anyhow::Result { - let MetadataStoreClient::ObjectStore { + pub(crate) fn from_configuration(configuration: MetadataClientKind) -> anyhow::Result { + let MetadataClientKind::ObjectStore { credentials, bucket, .. diff --git a/crates/core/src/metadata_store/providers/objstore/optimistic_store.rs b/crates/core/src/metadata_store/providers/objstore/optimistic_store.rs index 23a8f72f58..5b2c0a130c 100644 --- a/crates/core/src/metadata_store/providers/objstore/optimistic_store.rs +++ b/crates/core/src/metadata_store/providers/objstore/optimistic_store.rs @@ -17,17 +17,17 @@ use crate::metadata_store::providers::objstore::version_repository::{ TaggedValue, VersionRepository, VersionRepositoryError, }; use crate::metadata_store::{Precondition, ReadError, VersionedValue, WriteError}; -use restate_types::config::MetadataStoreClient; +use restate_types::config::MetadataClientKind; use restate_types::Version; pub(crate) struct OptimisticLockingMetadataStoreBuilder { pub(crate) version_repository: Box, - pub(crate) configuration: MetadataStoreClient, + pub(crate) configuration: MetadataClientKind, } impl OptimisticLockingMetadataStoreBuilder { pub(crate) async fn build(self) -> anyhow::Result { - let MetadataStoreClient::ObjectStore { .. } = self.configuration else { + let MetadataClientKind::ObjectStore { .. } = self.configuration else { anyhow::bail!("unexpected configuration value"); }; Ok(OptimisticLockingMetadataStore::new(self.version_repository)) diff --git a/crates/core/src/network/net_util.rs b/crates/core/src/network/net_util.rs index d6834ab3a0..e56be8d262 100644 --- a/crates/core/src/network/net_util.rs +++ b/crates/core/src/network/net_util.rs @@ -24,7 +24,7 @@ use tokio_util::net::Listener; use tonic::transport::{Channel, Endpoint}; use tracing::{debug, error_span, info, instrument, trace, Instrument, Span}; -use restate_types::config::{Configuration, MetadataStoreClientOptions, NetworkingOptions}; +use restate_types::config::{Configuration, MetadataClientOptions, NetworkingOptions}; use restate_types::errors::GenericError; use restate_types::net::{AdvertisedAddress, BindAddress}; @@ -292,9 +292,9 @@ impl CommonClientConnectionOptions for NetworkingOptions { } } -impl CommonClientConnectionOptions for MetadataStoreClientOptions { +impl CommonClientConnectionOptions for MetadataClientOptions { fn connect_timeout(&self) -> Duration { - self.metadata_store_connect_timeout.into() + self.connect_timeout.into() } fn request_timeout(&self) -> Option { @@ -302,11 +302,11 @@ impl CommonClientConnectionOptions for MetadataStoreClientOptions { } fn keep_alive_interval(&self) -> Duration { - self.metadata_store_keep_alive_interval.into() + self.keep_alive_interval.into() } fn keep_alive_timeout(&self) -> Duration { - self.metadata_store_keep_alive_timeout.into() + self.keep_alive_timeout.into() } fn http2_adaptive_window(&self) -> bool { diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index 07714d1c91..558caa711b 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -24,7 +24,7 @@ use restate_types::partition_table::PartitionReplication; use restate_types::protobuf::common::MetadataServerStatus; use restate_types::retries::RetryPolicy; use restate_types::{ - config::{Configuration, MetadataStoreClient}, + config::{Configuration, MetadataClientKind}, errors::GenericError, metadata_store::keys::NODES_CONFIG_KEY, net::{AdvertisedAddress, BindAddress}, @@ -132,12 +132,8 @@ impl Node { &self.base_config.common.advertised_address } - pub fn metadata_store_client_mut(&mut self) -> &mut MetadataStoreClient { - &mut self - .base_config - .common - .metadata_store_client - .metadata_store_client + pub fn metadata_store_client_mut(&mut self) -> &mut MetadataClientKind { + &mut self.base_config.common.metadata_client.kind } pub fn config(&self) -> &Configuration { @@ -231,7 +227,7 @@ impl Node { // update nodes with the addresses of the other nodes for node in &mut nodes { - *node.metadata_store_client_mut() = MetadataStoreClient::Embedded { + *node.metadata_store_client_mut() = MetadataClientKind::Native { addresses: node_addresses.clone(), } } @@ -251,11 +247,8 @@ impl Node { let base_dir = base_dir.into(); // ensure file paths are relative to the base dir - if let MetadataStoreClient::Embedded { addresses } = &mut self - .base_config - .common - .metadata_store_client - .metadata_store_client + if let MetadataClientKind::Native { addresses } = + &mut self.base_config.common.metadata_client.kind { for advertised_address in addresses { if let AdvertisedAddress::Uds(file) = advertised_address { @@ -721,8 +714,7 @@ impl StartedNode { pub async fn metadata_client( &self, ) -> Result { - restate_metadata_server::create_client(self.config().common.metadata_store_client.clone()) - .await + restate_metadata_server::create_client(self.config().common.metadata_client.clone()).await } /// Check to see if the admin address is healthy. Returns false if this node has no admin role. diff --git a/crates/metadata-server/src/grpc/client.rs b/crates/metadata-server/src/grpc/client.rs index 8f60be9d80..e7e9c44c7a 100644 --- a/crates/metadata-server/src/grpc/client.rs +++ b/crates/metadata-server/src/grpc/client.rs @@ -22,7 +22,7 @@ use restate_core::metadata_store::{ }; use restate_core::network::net_util::create_tonic_channel; use restate_core::{cancellation_watcher, Metadata, TaskCenter, TaskKind}; -use restate_types::config::{Configuration, MetadataStoreClientOptions}; +use restate_types::config::{Configuration, MetadataClientOptions}; use restate_types::net::metadata::MetadataKind; use restate_types::net::AdvertisedAddress; use restate_types::nodes_config::{MetadataServerState, NodesConfiguration, Role}; @@ -70,7 +70,7 @@ pub struct GrpcMetadataServerClient { impl GrpcMetadataServerClient { pub fn new( metadata_store_addresses: Vec, - client_options: MetadataStoreClientOptions, + client_options: MetadataClientOptions, ) -> Self { let channel_manager = ChannelManager::new(metadata_store_addresses, client_options); let svc_client = Arc::new(Mutex::new( @@ -362,13 +362,13 @@ impl StatusError { #[derive(Clone, Debug)] struct ChannelManager { channels: Arc>, - client_options: MetadataStoreClientOptions, + client_options: MetadataClientOptions, } impl ChannelManager { fn new( initial_addresses: Vec, - client_options: MetadataStoreClientOptions, + client_options: MetadataClientOptions, ) -> Self { let initial_channels: Vec<_> = initial_addresses .into_iter() diff --git a/crates/metadata-server/src/lib.rs b/crates/metadata-server/src/lib.rs index 74cab455f6..e4cf11b500 100644 --- a/crates/metadata-server/src/lib.rs +++ b/crates/metadata-server/src/lib.rs @@ -32,8 +32,7 @@ pub use restate_core::metadata_store::{ use restate_core::network::NetworkServerBuilder; use restate_core::{MetadataWriter, ShutdownError}; use restate_types::config::{ - Configuration, MetadataServerKind, MetadataServerOptions, MetadataStoreClientOptions, - RocksDbOptions, + Configuration, MetadataClientOptions, MetadataServerKind, MetadataServerOptions, RocksDbOptions, }; use restate_types::errors::GenericError; use restate_types::health::HealthStatus; @@ -613,25 +612,21 @@ impl Default for MetadataServerConfiguration { /// Creates a [`MetadataStoreClient`] for configured metadata store. pub async fn create_client( - metadata_store_client_options: MetadataStoreClientOptions, + metadata_store_client_options: MetadataClientOptions, ) -> Result { - let backoff_policy = Some( - metadata_store_client_options - .metadata_store_client_backoff_policy - .clone(), - ); - - let client = match metadata_store_client_options.metadata_store_client.clone() { - config::MetadataStoreClient::Embedded { addresses } => { + let backoff_policy = Some(metadata_store_client_options.backoff_policy.clone()); + + let client = match metadata_store_client_options.kind.clone() { + config::MetadataClientKind::Native { addresses } => { let inner_client = GrpcMetadataServerClient::new(addresses, metadata_store_client_options); MetadataStoreClient::new(inner_client, backoff_policy) } - config::MetadataStoreClient::Etcd { addresses } => { + config::MetadataClientKind::Etcd { addresses } => { let store = EtcdMetadataStore::new(addresses, &metadata_store_client_options).await?; MetadataStoreClient::new(store, backoff_policy) } - conf @ config::MetadataStoreClient::ObjectStore { .. } => { + conf @ config::MetadataClientKind::ObjectStore { .. } => { let store = create_object_store_based_meta_store(conf).await?; MetadataStoreClient::new(store, backoff_policy) } diff --git a/crates/metadata-server/src/local/tests.rs b/crates/metadata-server/src/local/tests.rs index 5075fe102e..97fb9a42b8 100644 --- a/crates/metadata-server/src/local/tests.rs +++ b/crates/metadata-server/src/local/tests.rs @@ -17,8 +17,8 @@ use restate_core::network::{FailingConnector, NetworkServerBuilder}; use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder}; use restate_rocksdb::RocksDbManager; use restate_types::config::{ - self, reset_base_temp_dir_and_retain, Configuration, MetadataServerOptions, - MetadataStoreClientOptions, RocksDbOptions, + self, reset_base_temp_dir_and_retain, Configuration, MetadataClientOptions, + MetadataServerOptions, RocksDbOptions, }; use restate_types::health::HealthStatus; use restate_types::live::{BoxedLiveLoad, Live}; @@ -209,7 +209,7 @@ async fn durable_storage() -> anyhow::Result<()> { // reset RocksDbManager to allow restarting the metadata store RocksDbManager::get().reset().await?; - let metadata_store_client_opts = MetadataStoreClientOptions::default(); + let metadata_store_client_opts = MetadataClientOptions::default(); let metadata_store_opts = opts.clone(); let metadata_store_opts = Live::from_value(metadata_store_opts); let client = start_metadata_server( @@ -258,7 +258,7 @@ async fn create_test_environment( RocksDbManager::init(config.clone().map(|c| &c.common)); let client = start_metadata_server( - config.pinned().common.metadata_store_client.clone(), + config.pinned().common.metadata_client.clone(), &config.pinned().metadata_server, config.clone().map(|c| &c.metadata_server.rocksdb).boxed(), ) @@ -268,7 +268,7 @@ async fn create_test_environment( } async fn start_metadata_server( - mut metadata_store_client_options: MetadataStoreClientOptions, + mut metadata_store_client_options: MetadataClientOptions, opts: &MetadataServerOptions, updateables_rocksdb_options: BoxedLiveLoad, ) -> anyhow::Result { @@ -283,7 +283,7 @@ async fn start_metadata_server( let uds = tempfile::tempdir()?.into_path().join("metadata-rpc-server"); let bind_address = BindAddress::Uds(uds.clone()); - metadata_store_client_options.metadata_store_client = config::MetadataStoreClient::Embedded { + metadata_store_client_options.kind = config::MetadataClientKind::Native { addresses: vec![AdvertisedAddress::Uds(uds)], }; @@ -307,8 +307,8 @@ async fn start_metadata_server( )?; assert2::let_assert!( - config::MetadataStoreClient::Embedded { addresses } = - metadata_store_client_options.metadata_store_client.clone() + config::MetadataClientKind::Native { addresses } = + metadata_store_client_options.kind.clone() ); rpc_server_health_status @@ -319,7 +319,7 @@ async fn start_metadata_server( GrpcMetadataServerClient::new(addresses, metadata_store_client_options.clone()); let client = MetadataStoreClient::new( grpc_client, - Some(metadata_store_client_options.metadata_store_client_backoff_policy), + Some(metadata_store_client_options.backoff_policy), ); Ok(client) diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index e51079a168..2128bc795a 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -145,7 +145,7 @@ impl Node { cluster_marker::validate_and_update_cluster_marker(config.common.cluster_name())?; let metadata_store_client = - restate_metadata_server::create_client(config.common.metadata_store_client.clone()) + restate_metadata_server::create_client(config.common.metadata_client.clone()) .await .map_err(BuildError::MetadataStoreClient)?; let metadata_manager = diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index c327f0355f..381bc28c98 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -110,8 +110,7 @@ pub struct CommonOptions { #[builder(setter(strip_option))] base_dir: Option, - #[serde(flatten)] - pub metadata_store_client: MetadataStoreClientOptions, + pub metadata_client: MetadataClientOptions, /// Address to bind for the Node server. Derived from the advertised address, defaulting /// to `0.0.0.0:$PORT` (where the port will be inferred from the URL scheme). @@ -405,12 +404,12 @@ impl Default for CommonOptions { location: None, force_node_id: None, cluster_name: "localcluster".to_owned(), - // boot strap the cluster by default. This is very likely to change in the future to be + // auto provision the cluster by default. This is very likely to change in the future to be // false by default. For now, this is true to make the converged deployment backward // compatible and easy for users. auto_provision: true, base_dir: None, - metadata_store_client: MetadataStoreClientOptions::default(), + metadata_client: MetadataClientOptions::default(), bind_address: None, advertised_address: AdvertisedAddress::from_str(DEFAULT_ADVERTISED_ADDRESS).unwrap(), default_num_partitions: NonZeroU16::new(24).expect("is not zero"), @@ -497,44 +496,65 @@ pub enum LogFormat { Json, } -/// # Service Client options +/// # Metadata client options #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[cfg_attr( feature = "schemars", - schemars(rename = "MetadataStoreClientOptions", default) + schemars(rename = "MetadataClientOptions", default) )] #[builder(default)] -#[serde(rename_all = "kebab-case")] -pub struct MetadataStoreClientOptions { - /// # Metadata store +#[serde(rename_all = "kebab-case", default)] +pub struct MetadataClientOptions { + /// # Metadata client type /// - /// Metadata store server kind. - pub metadata_store_client: MetadataStoreClient, + /// Which metadata client type to use for the cluster. + #[serde(flatten)] + pub kind: MetadataClientKind, /// # Connect timeout /// /// TCP connection timeout for connecting to the metadata store. #[serde_as(as = "serde_with::DisplayFromStr")] #[cfg_attr(feature = "schemars", schemars(with = "String"))] - pub metadata_store_connect_timeout: humantime::Duration, + pub connect_timeout: humantime::Duration, /// # Metadata Store Keep Alive Interval #[serde_as(as = "serde_with::DisplayFromStr")] #[cfg_attr(feature = "schemars", schemars(with = "String"))] - pub metadata_store_keep_alive_interval: humantime::Duration, + pub keep_alive_interval: humantime::Duration, /// # Metadata Store Keep Alive Timeout #[serde_as(as = "serde_with::DisplayFromStr")] #[cfg_attr(feature = "schemars", schemars(with = "String"))] - pub metadata_store_keep_alive_timeout: humantime::Duration, + pub keep_alive_timeout: humantime::Duration, - /// # Backoff policy used by the metadata store client + /// # Backoff policy used by the metadata client /// - /// Backoff policy used by the metadata store client when it encounters concurrent - /// modifications. - pub metadata_store_client_backoff_policy: RetryPolicy, + /// Backoff policy used by the metadata client when it encounters concurrent modifications. + pub backoff_policy: RetryPolicy, +} + +impl Default for MetadataClientOptions { + fn default() -> Self { + Self { + kind: MetadataClientKind::Native { + addresses: vec![DEFAULT_ADVERTISED_ADDRESS + .parse() + .expect("valid metadata store address")], + }, + connect_timeout: Duration::from_secs(3).into(), + keep_alive_interval: Duration::from_secs(5).into(), + keep_alive_timeout: Duration::from_secs(5).into(), + backoff_policy: RetryPolicy::exponential( + Duration::from_millis(250), + 2.0, + Some(10), + Some(Duration::from_millis(3000)), + ), + } + } } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -563,29 +583,30 @@ pub enum ObjectStoreCredentials { tag = "type", rename_all = "kebab-case", rename_all_fields = "kebab-case", - try_from = "MetadataStoreClientShadow" + try_from = "MetadataClientKindShadow" )] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[cfg_attr( feature = "schemars", schemars( - title = "Metadata Store", - description = "Definition of a bootstrap metadata store" + title = "Metadata client type", + description = "The metadata client type to store metadata" ) )] -pub enum MetadataStoreClient { - /// Connects to an embedded metadata store that is run by nodes that run with the MetadataStore role. - Embedded { +pub enum MetadataClientKind { + /// Store metadata on the native metadata store that runs on nodes with the metadata-server role. + Native { #[cfg_attr(feature = "schemars", schemars(with = "Vec"))] addresses: Vec, }, - /// Uses external etcd as metadata store. + /// Store metadata on an external etcd cluster. + /// /// The addresses are formatted as `host:port` Etcd { #[cfg_attr(feature = "schemars", schemars(with = "String"))] addresses: Vec, }, - /// Uses an object store as a metadata store. + /// Store metadata on an external object store. ObjectStore { credentials: ObjectStoreCredentials, @@ -603,40 +624,38 @@ pub enum MetadataStoreClient { )] // TODO(azmy): Remove this Shadow struct once we no longer support // the `address` configuration param. -enum MetadataStoreClientShadow { - /// Connects to an embedded metadata store that is run by nodes that run with the MetadataStore role. - Embedded { +enum MetadataClientKindShadow { + #[serde(alias = "embedded")] + Native { address: Option, addresses: Vec, }, - /// Uses external etcd as metadata store. - /// The addresses are formatted as `host:port` - Etcd { addresses: Vec }, - /// Uses an object store as a metadata store. + Etcd { + addresses: Vec, + }, ObjectStore { credentials: ObjectStoreCredentials, - /// # The bucket name to use for storage bucket: String, }, } -impl TryFrom for MetadataStoreClient { +impl TryFrom for MetadataClientKind { type Error = &'static str; - fn try_from(value: MetadataStoreClientShadow) -> Result { + fn try_from(value: MetadataClientKindShadow) -> Result { let result = match value { - MetadataStoreClientShadow::ObjectStore { + MetadataClientKindShadow::ObjectStore { credentials, bucket, } => Self::ObjectStore { credentials, bucket, }, - MetadataStoreClientShadow::Etcd { addresses } => Self::Etcd { addresses }, - MetadataStoreClientShadow::Embedded { address, addresses } => { + MetadataClientKindShadow::Etcd { addresses } => Self::Etcd { addresses }, + MetadataClientKindShadow::Native { address, addresses } => { let default_address: AdvertisedAddress = DEFAULT_ADVERTISED_ADDRESS.parse().unwrap(); - Self::Embedded { + Self::Native { addresses: match address { Some(address) if addresses == vec![default_address] => vec![address], Some(_) => return Err("Conflicting configuration, embedded metadata-store-client cannot have both `address` and `addresses`"), @@ -650,27 +669,6 @@ impl TryFrom for MetadataStoreClient { } } -impl Default for MetadataStoreClientOptions { - fn default() -> Self { - Self { - metadata_store_client: MetadataStoreClient::Embedded { - addresses: vec![DEFAULT_ADVERTISED_ADDRESS - .parse() - .expect("valid metadata store address")], - }, - metadata_store_connect_timeout: Duration::from_secs(3).into(), - metadata_store_keep_alive_interval: Duration::from_secs(5).into(), - metadata_store_keep_alive_timeout: Duration::from_secs(5).into(), - metadata_store_client_backoff_policy: RetryPolicy::exponential( - Duration::from_millis(250), - 2.0, - Some(10), - Some(Duration::from_millis(3000)), - ), - } - } -} - #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "kebab-case")] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] @@ -768,8 +766,6 @@ pub struct CommonOptionsShadow { force_node_id: Option, cluster_name: String, base_dir: Option, - #[serde(flatten)] - metadata_store_client: MetadataStoreClientOptions, bind_address: Option, advertised_address: AdvertisedAddress, #[serde_as(as = "serde_with::DisplayFromStr")] @@ -806,6 +802,21 @@ pub struct CommonOptionsShadow { initialization_timeout: humantime::Duration, disable_telemetry: bool, + metadata_client: MetadataClientOptions, + // todo drop in version 1.3 + metadata_store_client: Option, + #[serde_as(as = "Option")] + // todo drop in version 1.3 + metadata_store_connect_timeout: Option, + #[serde_as(as = "Option")] + // todo drop in version 1.3 + metadata_store_keep_alive_interval: Option, + #[serde_as(as = "Option")] + // todo drop in version 1.3 + metadata_store_keep_alive_timeout: Option, + // todo drop in version 1.3 + metadata_store_backoff_policy: Option, + auto_provision: bool, // todo drop in version 1.3 allow_bootstrap: Option, @@ -841,6 +852,48 @@ impl From for CommonOptions { }) .unwrap_or(value.default_num_partitions); + let mut metadata_client = value + .metadata_store_client + .inspect(|_| { + print_warning_deprecated_config_option( + "metadata-store-client", + Some("metadata-client"), + ); + }) + .unwrap_or(value.metadata_client); + + if let Some(backoff_policy) = value.metadata_store_backoff_policy { + print_warning_deprecated_config_option( + "metadata-store-backoff-policy", + Some("metadata-client.backoff-policy"), + ); + metadata_client.backoff_policy = backoff_policy; + } + + if let Some(connect_timeout) = value.metadata_store_connect_timeout { + print_warning_deprecated_config_option( + "metadata-store-connect-timeout", + Some("metadata-client.connect-timeout"), + ); + metadata_client.connect_timeout = connect_timeout; + } + + if let Some(keep_alive_interval) = value.metadata_store_keep_alive_interval { + print_warning_deprecated_config_option( + "metadata-store-keep-alive-interval", + Some("metadata-client.keep-alive-interval"), + ); + metadata_client.keep_alive_interval = keep_alive_interval; + } + + if let Some(keep_alive_timeout) = value.metadata_store_keep_alive_timeout { + print_warning_deprecated_config_option( + "metadata-store-keep-alive-timeout", + Some("metadata-client.keep-alive-timeout"), + ); + metadata_client.keep_alive_timeout = keep_alive_timeout; + } + CommonOptions { roles: value.roles, node_name: value.node_name, @@ -848,7 +901,7 @@ impl From for CommonOptions { force_node_id: value.force_node_id, cluster_name: value.cluster_name, base_dir: value.base_dir, - metadata_store_client: value.metadata_store_client, + metadata_client, bind_address: value.bind_address, advertised_address: value.advertised_address, shutdown_timeout: value.shutdown_timeout, diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index 3e04654e28..daceda694e 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -64,11 +64,7 @@ async fn replicated_loglet_client( .set_cluster_name(cluster.cluster_name().to_owned()); config.common.advertised_address = AdvertisedAddress::Uds(node_socket.clone()); config.common.bind_address = Some(BindAddress::Uds(node_socket.clone())); - config.common.metadata_store_client = cluster.nodes[0] - .config() - .common - .metadata_store_client - .clone(); + config.common.metadata_client = cluster.nodes[0].config().common.metadata_client.clone(); restate_types::config::set_current_config(config.clone()); diff --git a/server/tests/raft_metadata_cluster.rs b/server/tests/raft_metadata_cluster.rs index 4c5ea8f6bf..30afb71e7e 100644 --- a/server/tests/raft_metadata_cluster.rs +++ b/server/tests/raft_metadata_cluster.rs @@ -20,7 +20,7 @@ use restate_local_cluster_runner::node::{BinarySource, HealthCheck, Node}; use restate_metadata_server::create_client; use restate_metadata_server::tests::Value; use restate_types::config::{ - Configuration, MetadataServerKind, MetadataStoreClient, MetadataStoreClientOptions, RaftOptions, + Configuration, MetadataClientKind, MetadataClientOptions, MetadataServerKind, RaftOptions, }; use restate_types::Versioned; use std::num::NonZeroUsize; @@ -54,9 +54,9 @@ async fn raft_metadata_cluster_smoke_test() -> googletest::Result<()> { .map(|node| node.node_address().clone()) .collect(); - let metadata_store_client_options = MetadataStoreClientOptions { - metadata_store_client: MetadataStoreClient::Embedded { addresses }, - ..MetadataStoreClientOptions::default() + let metadata_store_client_options = MetadataClientOptions { + kind: MetadataClientKind::Native { addresses }, + ..MetadataClientOptions::default() }; let client = create_client(metadata_store_client_options) .await @@ -148,9 +148,9 @@ async fn raft_metadata_cluster_chaos_test() -> googletest::Result<()> { .map(|node| node.node_address().clone()) .collect(); - let metadata_store_client_options = MetadataStoreClientOptions { - metadata_store_client: MetadataStoreClient::Embedded { addresses }, - ..MetadataStoreClientOptions::default() + let metadata_store_client_options = MetadataClientOptions { + kind: MetadataClientKind::Native { addresses }, + ..MetadataClientOptions::default() }; let client = create_client(metadata_store_client_options) .await diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs index 7314045f9a..c026246281 100644 --- a/server/tests/trim_gap_handling.rs +++ b/server/tests/trim_gap_handling.rs @@ -29,7 +29,7 @@ use restate_local_cluster_runner::{ cluster::Cluster, node::{BinarySource, Node}, }; -use restate_types::config::{LogFormat, MetadataStoreClient}; +use restate_types::config::{LogFormat, MetadataClientKind}; use restate_types::identifiers::PartitionId; use restate_types::logs::metadata::ProviderKind::Replicated; use restate_types::logs::{LogId, Lsn}; @@ -147,7 +147,7 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { BinarySource::CargoTest, enum_set!(Role::HttpIngress | Role::Worker), ); - *worker_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { + *worker_3.metadata_store_client_mut() = MetadataClientKind::Native { addresses: vec![cluster.nodes[0].node_address().clone()], }; @@ -172,7 +172,7 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> { BinarySource::CargoTest, enum_set!(Role::HttpIngress | Role::Worker), ); - *worker_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { + *worker_3.metadata_store_client_mut() = MetadataClientKind::Native { addresses: vec![cluster.nodes[0].node_address().clone()], }; diff --git a/tools/restatectl/src/commands/log/dump_log.rs b/tools/restatectl/src/commands/log/dump_log.rs index 164bf79727..daf6a04088 100644 --- a/tools/restatectl/src/commands/log/dump_log.rs +++ b/tools/restatectl/src/commands/log/dump_log.rs @@ -75,7 +75,7 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { TaskCenter::try_set_global_metadata(metadata.clone()); let metadata_store_client = metadata_store::start_metadata_server( - config.common.metadata_store_client.clone(), + config.common.metadata_client.clone(), &config.metadata_server, Live::from_value(config.metadata_server.clone()) .map(|c| &c.rocksdb) diff --git a/tools/restatectl/src/commands/metadata/get.rs b/tools/restatectl/src/commands/metadata/get.rs index b86d2df9b3..241821b8e0 100644 --- a/tools/restatectl/src/commands/metadata/get.rs +++ b/tools/restatectl/src/commands/metadata/get.rs @@ -66,7 +66,7 @@ async fn get_value_direct(opts: &GetValueOpts) -> anyhow::Result restate_types::config::MetadataStoreClient::Etcd { + RemoteServiceType::Etcd => restate_types::config::MetadataClientKind::Etcd { addresses: opts.etcd.clone(), }, }; - let metadata_store_client_options = MetadataStoreClientOptions { - metadata_store_client: client, - ..MetadataStoreClientOptions::default() + let metadata_store_client_options = MetadataClientOptions { + kind: client, + ..MetadataClientOptions::default() }; create_client(metadata_store_client_options) diff --git a/tools/restatectl/src/commands/metadata/patch.rs b/tools/restatectl/src/commands/metadata/patch.rs index 30cd981638..2339dd4209 100644 --- a/tools/restatectl/src/commands/metadata/patch.rs +++ b/tools/restatectl/src/commands/metadata/patch.rs @@ -90,7 +90,7 @@ async fn patch_value_direct( debug!("RocksDB Initialized"); let metadata_store_client = start_metadata_server( - config.common.metadata_store_client.clone(), + config.common.metadata_client.clone(), &config.metadata_server, Live::from_value(config.metadata_server.clone()) .map(|c| &c.rocksdb) diff --git a/tools/restatectl/src/environment/metadata_store.rs b/tools/restatectl/src/environment/metadata_store.rs index fef5df7d02..2339263ae6 100644 --- a/tools/restatectl/src/environment/metadata_store.rs +++ b/tools/restatectl/src/environment/metadata_store.rs @@ -15,14 +15,14 @@ use restate_core::network::NetworkServerBuilder; use restate_core::{TaskCenter, TaskKind}; use restate_metadata_server::MetadataServer; use restate_types::config; -use restate_types::config::{MetadataServerOptions, MetadataStoreClientOptions, RocksDbOptions}; +use restate_types::config::{MetadataClientOptions, MetadataServerOptions, RocksDbOptions}; use restate_types::health::HealthStatus; use restate_types::live::BoxedLiveLoad; use restate_types::net::{AdvertisedAddress, BindAddress}; use restate_types::protobuf::common::NodeRpcStatus; pub async fn start_metadata_server( - mut metadata_store_client_options: MetadataStoreClientOptions, + mut metadata_store_client_options: MetadataClientOptions, opts: &MetadataServerOptions, updateables_rocksdb_options: BoxedLiveLoad, ) -> anyhow::Result { @@ -40,7 +40,7 @@ pub async fn start_metadata_server( // right now we only support running a local metadata store let uds = tempfile::tempdir()?.into_path().join("metadata-rpc-server"); let bind_address = BindAddress::Uds(uds.clone()); - metadata_store_client_options.metadata_store_client = config::MetadataStoreClient::Embedded { + metadata_store_client_options.kind = config::MetadataClientKind::Native { addresses: vec![AdvertisedAddress::Uds(uds)], };