diff --git a/Cargo.lock b/Cargo.lock index ea29836860..45c78b7778 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7028,6 +7028,7 @@ dependencies = [ "arc-swap", "async-trait", "bytes", + "bytestring", "clap", "codederror", "derive_builder", @@ -7040,6 +7041,7 @@ dependencies = [ "hyper-util", "mock-service-endpoint", "pin-project", + "rand", "regex", "reqwest", "restate-admin", @@ -7048,6 +7050,7 @@ dependencies = [ "restate-errors", "restate-fs-util", "restate-local-cluster-runner", + "restate-metadata-server", "restate-node", "restate-rocksdb", "restate-service-client", diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index 517f2e3e2b..46fa85f3cc 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -182,7 +182,14 @@ impl Node { let mut nodes = Vec::with_capacity(usize::try_from(size).expect("u32 to fit into usize")); base_config.common.allow_bootstrap = false; - base_config.metadata_server.kind = MetadataServerKind::Raft(RaftOptions::default()); + base_config.common.log_disable_ansi_codes = true; + if !matches!( + base_config.metadata_server.kind, + MetadataServerKind::Raft(_) + ) { + info!("Setting the metadata server to embedded"); + base_config.metadata_server.kind = MetadataServerKind::Raft(RaftOptions::default()); + } for node_id in 1..=size { let mut effective_config = base_config.clone(); @@ -295,7 +302,7 @@ impl Node { inherit_env, env, searcher, - } = self; + } = &self; let node_base_dir = std::path::absolute( base_config @@ -332,7 +339,7 @@ impl Node { .await .map_err(NodeStartError::CreateLog)?; - let binary_path: OsString = binary_source.try_into()?; + let binary_path: OsString = binary_source.clone().try_into()?; let mut cmd = Command::new(&binary_path); if !inherit_env { @@ -342,13 +349,13 @@ impl Node { } .env("RESTATE_CONFIG", node_config_file) .env("DO_NOT_TRACK", "true") // avoid sending telemetry as part of tests - .envs(env) + .envs(env.clone()) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .kill_on_drop(true) .process_group(0) // avoid terminal control C being propagated - .args(&args); + .args(args); let mut child = cmd.spawn().map_err(NodeStartError::SpawnError)?; let pid = child.id().expect("child to have a pid"); @@ -429,10 +436,10 @@ impl Node { log_file: node_log_filename, status: StartedNodeStatus::Running { child_handle, - searcher, + searcher: searcher.clone(), pid, }, - config: base_config, + node: Some(self), }) } @@ -518,7 +525,7 @@ impl TryInto for BinarySource { pub struct StartedNode { log_file: PathBuf, status: StartedNodeStatus, - config: Configuration, + node: Option, } enum StartedNodeStatus { @@ -561,18 +568,17 @@ impl StartedNode { StartedNodeStatus::Exited(status) => Ok(status), StartedNodeStatus::Failed(kind) => Err(kind.into()), StartedNodeStatus::Running { pid, .. } => { - info!( - "Sending SIGKILL to node {} (pid {})", - self.config.node_name(), - pid - ); + info!("Sending SIGKILL to node {} (pid {})", self.node_name(), pid); match nix::sys::signal::kill( nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")), nix::sys::signal::SIGKILL, ) { Ok(()) => (&mut self.status).await, Err(errno) => match errno { - nix::errno::Errno::ESRCH => Ok(ExitStatus::default()), // ignore "no such process" + nix::errno::Errno::ESRCH => { + self.status = StartedNodeStatus::Exited(ExitStatus::default()); + Ok(ExitStatus::default()) + } // ignore "no such process" _ => Err(io::Error::from_raw_os_error(errno as i32)), }, } @@ -586,11 +592,7 @@ impl StartedNode { StartedNodeStatus::Exited(_) => Ok(()), StartedNodeStatus::Failed(kind) => Err(kind.into()), StartedNodeStatus::Running { pid, .. } => { - info!( - "Sending SIGTERM to node {} (pid {})", - self.config.node_name(), - pid - ); + info!("Sending SIGTERM to node {} (pid {})", self.node_name(), pid); match nix::sys::signal::kill( nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")), nix::sys::signal::SIGTERM, @@ -598,7 +600,7 @@ impl StartedNode { Err(nix::errno::Errno::ESRCH) => { warn!( "Node {} server process (pid {}) did not exist when sending SIGTERM", - self.config.node_name(), + self.node_name(), pid ); Ok(()) @@ -610,6 +612,17 @@ impl StartedNode { } } + pub async fn restart(&mut self) -> anyhow::Result<()> { + info!("Restarting node '{}'", self.config().node_name()); + self.kill().await?; + assert!( + !matches!(self.status, StartedNodeStatus::Running { .. }), + "Node should not be in status running after killing it." + ); + *self = self.node.take().expect("to be present").start().await?; + Ok(()) + } + /// Send a SIGTERM, then wait for `dur` for exit, otherwise send a SIGKILL pub async fn graceful_shutdown(&mut self, dur: Duration) -> io::Result { match self.status { @@ -651,7 +664,7 @@ impl StartedNode { } pub fn config(&self) -> &Configuration { - &self.config + &self.node.as_ref().expect("to be present").base_config } pub fn node_name(&self) -> &str { @@ -708,10 +721,8 @@ impl StartedNode { pub async fn metadata_client( &self, ) -> Result { - restate_metadata_server::local::create_client( - self.config.common.metadata_store_client.clone(), - ) - .await + restate_metadata_server::create_client(self.config().common.metadata_store_client.clone()) + .await } /// Check to see if the admin address is healthy. Returns false if this node has no admin role. @@ -766,8 +777,8 @@ impl StartedNode { /// Check to see if the metadata server has joined the metadata cluster. pub async fn metadata_server_joined_cluster(&self) -> bool { let mut metadata_server_client = MetadataServerSvcClient::new(create_tonic_channel( - self.config.common.advertised_address.clone(), - &self.config.networking, + self.config().common.advertised_address.clone(), + &self.config().networking, )); let Ok(response) = metadata_server_client @@ -840,7 +851,7 @@ impl Drop for StartedNode { if let StartedNodeStatus::Running { pid, .. } = self.status { warn!( "Node {} (pid {}) dropped without explicit shutdown", - self.config.node_name(), + self.config().node_name(), pid, ); match nix::sys::signal::kill( @@ -850,7 +861,7 @@ impl Drop for StartedNode { Ok(()) | Err(nix::errno::Errno::ESRCH) => {} err => error!( "Failed to send SIGKILL to running node {} (pid {}): {:?}", - self.config.node_name(), + self.config().node_name(), pid, err, ), diff --git a/crates/metadata-server/src/lib.rs b/crates/metadata-server/src/lib.rs index 4d996971b8..8e4b9b1568 100644 --- a/crates/metadata-server/src/lib.rs +++ b/crates/metadata-server/src/lib.rs @@ -10,10 +10,10 @@ pub mod grpc; pub mod local; -mod network; pub mod raft; mod util; +use crate::grpc::client::GrpcMetadataServerClient; use crate::grpc::handler::MetadataStoreHandler; use crate::grpc::metadata_server_svc_server::MetadataServerSvcServer; use assert2::let_assert; @@ -22,6 +22,9 @@ use bytestring::ByteString; use grpc::pb_conversions::ConversionError; use prost::Message; use raft_proto::eraftpb::Snapshot; +use restate_core::metadata_store::providers::{ + create_object_store_based_meta_store, EtcdMetadataStore, +}; use restate_core::metadata_store::VersionedValue; pub use restate_core::metadata_store::{ MetadataStoreClient, Precondition, ReadError, ReadModifyWriteError, WriteError, @@ -29,7 +32,8 @@ pub use restate_core::metadata_store::{ use restate_core::network::NetworkServerBuilder; use restate_core::{MetadataWriter, ShutdownError}; use restate_types::config::{ - Configuration, MetadataServerKind, MetadataServerOptions, RocksDbOptions, + Configuration, MetadataServerKind, MetadataServerOptions, MetadataStoreClientOptions, + RocksDbOptions, }; use restate_types::errors::GenericError; use restate_types::health::HealthStatus; @@ -40,7 +44,7 @@ use restate_types::nodes_config::{ }; use restate_types::protobuf::common::MetadataServerStatus; use restate_types::storage::{StorageDecodeError, StorageEncodeError}; -use restate_types::{GenerationalNodeId, PlainNodeId, Version}; +use restate_types::{config, GenerationalNodeId, PlainNodeId, Version}; use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::future::Future; @@ -661,6 +665,35 @@ impl Default for MetadataServerConfiguration { } } +/// Creates a [`MetadataStoreClient`] for configured metadata store. +pub async fn create_client( + metadata_store_client_options: MetadataStoreClientOptions, +) -> Result { + let backoff_policy = Some( + metadata_store_client_options + .metadata_store_client_backoff_policy + .clone(), + ); + + let client = match metadata_store_client_options.metadata_store_client.clone() { + config::MetadataStoreClient::Embedded { addresses } => { + let inner_client = + GrpcMetadataServerClient::new(addresses, metadata_store_client_options); + MetadataStoreClient::new(inner_client, backoff_policy) + } + config::MetadataStoreClient::Etcd { addresses } => { + let store = EtcdMetadataStore::new(addresses, &metadata_store_client_options).await?; + MetadataStoreClient::new(store, backoff_policy) + } + conf @ config::MetadataStoreClient::ObjectStore { .. } => { + let store = create_object_store_based_meta_store(conf).await?; + MetadataStoreClient::new(store, backoff_policy) + } + }; + + Ok(client) +} + /// Ensures that the initial nodes configuration contains the current node and has the right /// [`MetadataServerState`] set. fn prepare_initial_nodes_configuration( @@ -722,3 +755,46 @@ fn prepare_initial_nodes_configuration( Ok(plain_node_id) } + +#[cfg(any(test, feature = "test-util"))] +pub mod tests { + use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)] + pub struct Value { + pub version: Version, + pub value: u32, + } + + impl Default for Value { + fn default() -> Self { + Self { + version: Version::MIN, + value: Default::default(), + } + } + } + + impl Value { + pub fn new(value: u32) -> Self { + Value { + value, + ..Value::default() + } + } + + pub fn next_version(mut self) -> Self { + self.version = self.version.next(); + self + } + } + + impl Versioned for Value { + fn version(&self) -> Version { + self.version + } + } + + flexbuffers_storage_encode_decode!(Value); +} diff --git a/crates/metadata-server/src/local/mod.rs b/crates/metadata-server/src/local/mod.rs index 22343aff50..32f946860a 100644 --- a/crates/metadata-server/src/local/mod.rs +++ b/crates/metadata-server/src/local/mod.rs @@ -8,54 +8,18 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::grpc::client::GrpcMetadataServerClient; -use restate_core::metadata_store::providers::create_object_store_based_meta_store; -use restate_core::metadata_store::{providers::EtcdMetadataStore, MetadataStoreClient}; use restate_core::network::NetworkServerBuilder; use restate_rocksdb::RocksError; use restate_types::config::{MetadataServerOptions, RocksDbOptions}; use restate_types::health::HealthStatus; use restate_types::live::BoxedLiveLoad; use restate_types::protobuf::common::MetadataServerStatus; -use restate_types::{ - config::{MetadataStoreClient as MetadataStoreClientConfig, MetadataStoreClientOptions}, - errors::GenericError, -}; mod store; use crate::MetadataServerRunner; pub use store::LocalMetadataServer; -/// Creates a [`MetadataStoreClient`] for the [`GrpcMetadataServerClient`]. -pub async fn create_client( - metadata_store_client_options: MetadataStoreClientOptions, -) -> Result { - let backoff_policy = Some( - metadata_store_client_options - .metadata_store_client_backoff_policy - .clone(), - ); - - let client = match metadata_store_client_options.metadata_store_client.clone() { - MetadataStoreClientConfig::Embedded { addresses } => { - let inner_client = - GrpcMetadataServerClient::new(addresses, metadata_store_client_options); - MetadataStoreClient::new(inner_client, backoff_policy) - } - MetadataStoreClientConfig::Etcd { addresses } => { - let store = EtcdMetadataStore::new(addresses, &metadata_store_client_options).await?; - MetadataStoreClient::new(store, backoff_policy) - } - conf @ MetadataStoreClientConfig::ObjectStore { .. } => { - let store = create_object_store_based_meta_store(conf).await?; - MetadataStoreClient::new(store, backoff_policy) - } - }; - - Ok(client) -} - pub(crate) async fn create_server( metadata_server_options: &MetadataServerOptions, rocksdb_options: BoxedLiveLoad, diff --git a/crates/metadata-server/src/local/tests.rs b/crates/metadata-server/src/local/tests.rs index bead19d6a9..debfc27376 100644 --- a/crates/metadata-server/src/local/tests.rs +++ b/crates/metadata-server/src/local/tests.rs @@ -11,7 +11,6 @@ use bytestring::ByteString; use futures::stream::FuturesUnordered; use futures::StreamExt; -use serde::{Deserialize, Serialize}; use test_log::test; use restate_core::network::{FailingConnector, NetworkServerBuilder}; @@ -25,42 +24,13 @@ use restate_types::health::HealthStatus; use restate_types::live::{BoxedLiveLoad, Live}; use restate_types::net::{AdvertisedAddress, BindAddress}; use restate_types::protobuf::common::NodeRpcStatus; -use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; +use restate_types::{Version, Versioned}; use crate::grpc::client::GrpcMetadataServerClient; use crate::local::LocalMetadataServer; +use crate::tests::Value; use crate::{MetadataServer, MetadataServerRunner, MetadataStoreClient, Precondition, WriteError}; -#[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)] -struct Value { - version: Version, - value: String, -} - -impl Default for Value { - fn default() -> Self { - Self { - version: Version::MIN, - value: Default::default(), - } - } -} - -impl Value { - fn next_version(mut self) -> Self { - self.version = self.version.next(); - self - } -} - -impl Versioned for Value { - fn version(&self) -> Version { - self.version - } -} - -flexbuffers_storage_encode_decode!(Value); - /// Tests basic operations of the metadata store. #[test(restate_core::test(flavor = "multi_thread", worker_threads = 2))] async fn basic_metadata_store_operations() -> anyhow::Result<()> { @@ -69,17 +39,17 @@ async fn basic_metadata_store_operations() -> anyhow::Result<()> { let key: ByteString = "key".into(); let value = Value { version: Version::MIN, - value: "test_value".to_owned(), + value: 1, }; let next_value = Value { version: Version::from(2), - value: "next_value".to_owned(), + value: 2, }; let other_value = Value { version: Version::MIN, - value: "other_value".to_owned(), + value: 3, }; // first get should be empty @@ -227,7 +197,7 @@ async fn durable_storage() -> anyhow::Result<()> { metadata_key, &Value { version: Version::from(key), - value, + value: key, }, Precondition::DoesNotExist, ) @@ -258,7 +228,7 @@ async fn durable_storage() -> anyhow::Result<()> { client.get(metadata_key).await?, Some(Value { version: Version::from(key), - value + value: key, }) ); } diff --git a/crates/metadata-server/src/raft/mod.rs b/crates/metadata-server/src/raft/mod.rs index eed9c5d64b..f508e988a0 100644 --- a/crates/metadata-server/src/raft/mod.rs +++ b/crates/metadata-server/src/raft/mod.rs @@ -9,14 +9,15 @@ // by the Apache License, Version 2.0. mod kv_memory_storage; +mod network; mod storage; mod store; -use crate::network::{MetadataServerNetworkSvcServer, MetadataStoreNetworkHandler, NetworkMessage}; use crate::raft::store::BuildError; -use crate::{network, MemberId, MetadataServerRunner}; +use crate::{MemberId, MetadataServerRunner}; use anyhow::Context; use bytes::{Buf, BufMut}; +use network::{MetadataServerNetworkSvcServer, MetadataStoreNetworkHandler, NetworkMessage}; use protobuf::Message as ProtobufMessage; use restate_core::network::NetworkServerBuilder; use restate_core::MetadataWriter; diff --git a/crates/metadata-server/src/network/connection_manager.rs b/crates/metadata-server/src/raft/network/connection_manager.rs similarity index 98% rename from crates/metadata-server/src/network/connection_manager.rs rename to crates/metadata-server/src/raft/network/connection_manager.rs index 1c12d5871b..d5e56c0266 100644 --- a/crates/metadata-server/src/network/connection_manager.rs +++ b/crates/metadata-server/src/raft/network/connection_manager.rs @@ -8,8 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::network::grpc_svc; -use crate::network::NetworkMessage; +use crate::raft::network::{grpc_svc, NetworkMessage}; use futures::StreamExt; use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; use std::collections::HashMap; diff --git a/crates/metadata-server/src/network/grpc_svc.rs b/crates/metadata-server/src/raft/network/grpc_svc.rs similarity index 100% rename from crates/metadata-server/src/network/grpc_svc.rs rename to crates/metadata-server/src/raft/network/grpc_svc.rs diff --git a/crates/metadata-server/src/network/handler.rs b/crates/metadata-server/src/raft/network/handler.rs similarity index 94% rename from crates/metadata-server/src/network/handler.rs rename to crates/metadata-server/src/raft/network/handler.rs index be4b38be5e..4e29c6373b 100644 --- a/crates/metadata-server/src/network/handler.rs +++ b/crates/metadata-server/src/raft/network/handler.rs @@ -8,10 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::network::connection_manager::ConnectionError; -use crate::network::grpc_svc::metadata_server_network_svc_server::MetadataServerNetworkSvc; -use crate::network::grpc_svc::JoinClusterRequest; -use crate::network::{grpc_svc, ConnectionManager, NetworkMessage}; +use crate::raft::network::connection_manager::ConnectionError; +use crate::raft::network::grpc_svc::metadata_server_network_svc_server::MetadataServerNetworkSvc; +use crate::raft::network::grpc_svc::JoinClusterRequest; +use crate::raft::network::{grpc_svc, ConnectionManager, NetworkMessage}; use crate::{JoinClusterError, JoinClusterHandle}; use arc_swap::access::Access; use arc_swap::ArcSwapOption; diff --git a/crates/metadata-server/src/network/mod.rs b/crates/metadata-server/src/raft/network/mod.rs similarity index 100% rename from crates/metadata-server/src/network/mod.rs rename to crates/metadata-server/src/raft/network/mod.rs diff --git a/crates/metadata-server/src/network/networking.rs b/crates/metadata-server/src/raft/network/networking.rs similarity index 97% rename from crates/metadata-server/src/network/networking.rs rename to crates/metadata-server/src/raft/network/networking.rs index 282253ac3d..79f5209216 100644 --- a/crates/metadata-server/src/network/networking.rs +++ b/crates/metadata-server/src/raft/network/networking.rs @@ -8,9 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::network::connection_manager::ConnectionManager; -use crate::network::grpc_svc; -use crate::network::handler::PEER_METADATA_KEY; +use crate::raft::network::connection_manager::ConnectionManager; +use crate::raft::network::grpc_svc; +use crate::raft::network::handler::PEER_METADATA_KEY; use bytes::{Buf, BufMut, BytesMut}; use futures::FutureExt; use restate_core::network::net_util; diff --git a/crates/metadata-server/src/raft/store.rs b/crates/metadata-server/src/raft/store.rs index f220eee230..593ccc009c 100644 --- a/crates/metadata-server/src/raft/store.rs +++ b/crates/metadata-server/src/raft/store.rs @@ -10,9 +10,9 @@ use crate::grpc::pb_conversions::ConversionError; use crate::grpc::MetadataServerSnapshot; -use crate::network::grpc_svc::metadata_server_network_svc_client::MetadataServerNetworkSvcClient; -use crate::network::{ConnectionManager, Networking}; use crate::raft::kv_memory_storage::KvMemoryStorage; +use crate::raft::network::grpc_svc::metadata_server_network_svc_client::MetadataServerNetworkSvcClient; +use crate::raft::network::{ConnectionManager, Networking}; use crate::raft::storage::RocksDbStorage; use crate::raft::{storage, RaftConfiguration}; use crate::{ @@ -641,7 +641,7 @@ impl Member { self.raw_node.step(raft)?; }, Ok(()) = nodes_config_watch.changed() => { - self.update_node_addresses(&Metadata::with_current(|m| m.nodes_config_ref())); + self.update_node_addresses(); }, _ = self.tick_interval.tick() => { self.raw_node.tick(); @@ -748,7 +748,9 @@ impl Member { return; } - let nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + let metadata_nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + let nodes_config = + Self::latest_nodes_configuration(&self.kv_storage, &metadata_nodes_config); let Ok(joining_node_config) = nodes_config.find_node_by_id(joining_member_id.node_id) else { @@ -1043,7 +1045,7 @@ impl Member { self.answer_join_callbacks(); self.update_leadership(); - self.update_node_addresses(&Metadata::with_current(|m| m.nodes_config_ref())); + self.update_node_addresses(); self.update_status(); Ok(()) @@ -1197,13 +1199,18 @@ impl Member { } } - fn update_node_addresses(&mut self, nodes_configuration: &NodesConfiguration) { + fn update_node_addresses(&mut self) { + let metadata_nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + let nodes_config = + Self::latest_nodes_configuration(&self.kv_storage, &metadata_nodes_config); + trace!( "Update node addresses in networking based on NodesConfiguration '{}'", - nodes_configuration.version() + nodes_config.version() ); + for node_id in self.raw_node.raft.prs().conf().voters().ids().iter() { - if let Ok(node_config) = nodes_configuration.find_node_by_id(PlainNodeId::from( + if let Ok(node_config) = nodes_config.find_node_by_id(PlainNodeId::from( u32::try_from(node_id).expect("node id is derived from PlainNodeId"), )) { self.networking @@ -1265,6 +1272,19 @@ impl Member { self.configuration.members.contains_key(&node_id) } + fn latest_nodes_configuration<'a>( + kv_storage: &'a KvMemoryStorage, + metadata_nodes_config: &'a NodesConfiguration, + ) -> &'a NodesConfiguration { + let kv_storage_nodes_config = kv_storage.last_seen_nodes_configuration(); + + if metadata_nodes_config.version() > kv_storage_nodes_config.version() { + metadata_nodes_config + } else { + kv_storage_nodes_config + } + } + /// Returns the known leader from the Raft instance or a random known leader from the /// current nodes configuration. fn known_leader(&self) -> Option { @@ -1274,7 +1294,9 @@ impl Member { let leader = to_plain_node_id(self.raw_node.raft.leader_id); - let nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + let metadata_nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + let nodes_config = + Self::latest_nodes_configuration(&self.kv_storage, &metadata_nodes_config); nodes_config .find_node_by_id(leader) .ok() @@ -1411,9 +1433,11 @@ impl Standby { Span::current().record("member_id", MemberId::new(my_node_id.unwrap(), self.storage_id).to_string()); } - if matches!(node_config.metadata_server_config.metadata_server_state, MetadataServerState::Member) && join_cluster.is_terminated() { - debug!("Node is part of the metadata store cluster. Trying to join the raft cluster."); - join_cluster.set(Some(Self::join_cluster(None, None, my_node_id.unwrap(), storage_id).fuse()).into()); + if matches!(node_config.metadata_server_config.metadata_server_state, MetadataServerState::Member) { + if join_cluster.is_terminated() { + debug!("Node is part of the metadata store cluster. Trying to join the raft cluster."); + join_cluster.set(Some(Self::join_cluster(None, None, my_node_id.unwrap(), storage_id).fuse()).into()); + } } else { debug!("Node is not part of the metadata store cluster. Waiting to become a candidate."); join_cluster.set(None.into()); @@ -1473,7 +1497,7 @@ impl Standby { .send_compressed(CompressionEncoding::Gzip); if let Err(status) = client - .join_cluster(crate::network::grpc_svc::JoinClusterRequest { + .join_cluster(crate::raft::network::grpc_svc::JoinClusterRequest { node_id: u32::from(my_node_id), storage_id, }) diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index d7fa391220..1fe29310b3 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -148,11 +148,10 @@ impl Node { cluster_marker::validate_and_update_cluster_marker(config.common.cluster_name())?; - let metadata_store_client = restate_metadata_server::local::create_client( - config.common.metadata_store_client.clone(), - ) - .await - .map_err(BuildError::MetadataStoreClient)?; + let metadata_store_client = + restate_metadata_server::create_client(config.common.metadata_store_client.clone()) + .await + .map_err(BuildError::MetadataStoreClient)?; let metadata_manager = MetadataManager::new(metadata_builder, metadata_store_client.clone()); let metadata_writer = metadata_manager.writer(); diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index d607627de9..59f24bf378 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -322,6 +322,11 @@ impl CommonOptions { }) } + #[cfg(any(test, feature = "test-util"))] + pub fn base_dir_opt(&self) -> Option<&PathBuf> { + self.base_dir.as_ref() + } + pub fn rocksdb_actual_total_memtables_size(&self) -> usize { let sanitized = self.rocksdb_total_memtables_ratio.clamp(0.0, 1.0) as f64; let total_mem = self.rocksdb_total_memory_size.get() as f64; diff --git a/crates/types/src/config/mod.rs b/crates/types/src/config/mod.rs index 1b384593b5..2bfe3009a3 100644 --- a/crates/types/src/config/mod.rs +++ b/crates/types/src/config/mod.rs @@ -139,6 +139,11 @@ pub fn reset_base_temp_dir_and_retain() -> PathBuf { pub fn set_current_config(config: Configuration) { #[cfg(not(any(test, feature = "test-util")))] let proposed_cwd = config.common.base_dir().join(config.node_name()); + #[cfg(any(test, feature = "test-util"))] + if let Some(base_dir) = config.common.base_dir_opt() { + // overwrite temp directory if an explicit base dir was configured + set_base_temp_dir(base_dir.clone().join(config.node_name())); + } // todo: potentially validate the config CONFIGURATION.store(Arc::new(config)); #[cfg(not(any(test, feature = "test-util")))] diff --git a/crates/types/src/version.rs b/crates/types/src/version.rs index cc44dcfb26..42a8d7edb2 100644 --- a/crates/types/src/version.rs +++ b/crates/types/src/version.rs @@ -35,6 +35,11 @@ impl Version { pub fn next(self) -> Self { Version(self.0 + 1) } + + #[cfg(feature = "test-util")] + pub fn prev(self) -> Self { + Version(self.0.saturating_sub(1)) + } } impl From for Version { diff --git a/server/Cargo.toml b/server/Cargo.toml index c40c4e4f9b..de3aa69128 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -78,6 +78,7 @@ restate-admin = { workspace = true, features = ["memory-loglet", "clients"] } restate-bifrost = { workspace = true, features = ["test-util"] } restate-core = { workspace = true, features = ["test-util"] } restate-local-cluster-runner = { workspace = true } +restate-metadata-server = { workspace = true, features = ["test-util"] } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } mock-service-endpoint = { workspace = true } @@ -85,6 +86,7 @@ mock-service-endpoint = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +bytestring = { workspace = true} googletest = { workspace = true } hyper-util = { workspace = true } tempfile = { workspace = true } @@ -92,6 +94,7 @@ test-log = { workspace = true } tonic = { workspace = true, features = ["transport", "prost"] } tower = { workspace = true } tracing-subscriber = { workspace = true } +rand = { workspace = true } reqwest = { workspace = true } serde_json = { workspace = true } url = { workspace = true } diff --git a/server/tests/raft_metadata_cluster.rs b/server/tests/raft_metadata_cluster.rs new file mode 100644 index 0000000000..0bd3bfd891 --- /dev/null +++ b/server/tests/raft_metadata_cluster.rs @@ -0,0 +1,271 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use bytestring::ByteString; +use enumset::EnumSet; +use googletest::prelude::err; +use googletest::{assert_that, pat, IntoTestResult}; +use rand::prelude::SliceRandom; +use restate_core::metadata_store::{Precondition, WriteError}; +use restate_core::{cancellation_watcher, TaskCenter, TaskKind}; +use restate_local_cluster_runner::cluster::Cluster; +use restate_local_cluster_runner::node::{BinarySource, HealthCheck, Node}; +use restate_metadata_server::create_client; +use restate_metadata_server::tests::Value; +use restate_types::config::{ + Configuration, MetadataServerKind, MetadataStoreClient, MetadataStoreClientOptions, RaftOptions, +}; +use restate_types::Versioned; +use std::num::NonZeroUsize; +use std::time::{Duration, Instant}; +use tracing::info; + +#[test_log::test(restate_core::test)] +async fn raft_metadata_cluster_smoke_test() -> googletest::Result<()> { + let base_config = Configuration::default(); + + let nodes = Node::new_test_nodes( + base_config, + BinarySource::CargoTest, + EnumSet::empty(), + 3, + true, + ); + let mut cluster = Cluster::builder() + .cluster_name("raft_metadata_cluster_smoke_test") + .nodes(nodes) + .temp_base_dir() + .build() + .start() + .await?; + + cluster.wait_healthy(Duration::from_secs(30)).await?; + + let addresses = cluster + .nodes + .iter() + .map(|node| node.node_address().clone()) + .collect(); + + let metadata_store_client_options = MetadataStoreClientOptions { + metadata_store_client: MetadataStoreClient::Embedded { addresses }, + ..MetadataStoreClientOptions::default() + }; + let client = create_client(metadata_store_client_options) + .await + .expect("to not fail"); + + let value = Value::new(42); + let value_version = value.version(); + let key = ByteString::from_static("my-key"); + client + .put(key.clone(), &value, Precondition::DoesNotExist) + .await?; + + let stored_value = client.get::(key.clone()).await?; + assert_eq!(stored_value, Some(value)); + + let stored_value_version = client.get_version(key.clone()).await?; + assert_eq!(stored_value_version, Some(value_version)); + + let new_value = Value::new(1337); + let new_value_version = new_value.version(); + assert_that!( + client + .put( + key.clone(), + &new_value, + Precondition::MatchesVersion(value_version.next()), + ) + .await, + err(pat!(WriteError::FailedPrecondition(_))) + ); + assert_that!( + client + .put(key.clone(), &new_value, Precondition::DoesNotExist) + .await, + err(pat!(WriteError::FailedPrecondition(_))) + ); + + client + .put( + key.clone(), + &new_value, + Precondition::MatchesVersion(value_version), + ) + .await?; + let stored_new_value = client.get::(key.clone()).await?; + assert_eq!(stored_new_value, Some(new_value)); + + client + .delete(key.clone(), Precondition::MatchesVersion(new_value_version)) + .await?; + assert!(client.get::(key.clone()).await?.is_none()); + + cluster.graceful_shutdown(Duration::from_secs(3)).await?; + + Ok(()) +} + +#[test_log::test(restate_core::test)] +async fn raft_metadata_cluster_chaos_test() -> googletest::Result<()> { + let num_nodes = 3; + let chaos_duration = Duration::from_secs(20); + let mut base_config = Configuration::default(); + base_config.metadata_server.kind = MetadataServerKind::Raft(RaftOptions { + raft_election_tick: NonZeroUsize::new(5).expect("5 to be non zero"), + raft_heartbeat_tick: NonZeroUsize::new(2).expect("2 to be non zero"), + ..RaftOptions::default() + }); + + let nodes = Node::new_test_nodes( + base_config, + BinarySource::CargoTest, + EnumSet::empty(), + num_nodes, + true, + ); + let mut cluster = Cluster::builder() + .cluster_name("raft_metadata_cluster_smoke_test") + .nodes(nodes) + .temp_base_dir() + .build() + .start() + .await?; + + cluster.wait_healthy(Duration::from_secs(30)).await?; + + let addresses = cluster + .nodes + .iter() + .map(|node| node.node_address().clone()) + .collect(); + + let metadata_store_client_options = MetadataStoreClientOptions { + metadata_store_client: MetadataStoreClient::Embedded { addresses }, + ..MetadataStoreClientOptions::default() + }; + let client = create_client(metadata_store_client_options) + .await + .expect("to not fail"); + + let start_chaos = Instant::now(); + + let chaos_handle = TaskCenter::spawn_unmanaged(TaskKind::Background, "chaos", async move { + let mut shutdown = std::pin::pin!(cancellation_watcher()); + + loop { + let node = cluster + .nodes + .choose_mut(&mut rand::thread_rng()) + .expect("at least one node being present"); + + tokio::select! { + _ = &mut shutdown => { + break; + }, + result = node.restart() => { + result?; + // wait until the cluster is healthy again + tokio::select! { + _ = &mut shutdown => { + break; + } + result = cluster.wait_check_healthy(HealthCheck::MetadataServer, Duration::from_secs(10)) => { + result?; + } + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + + Ok::<_, anyhow::Error>(cluster) + })?; + + let key = ByteString::from_static("my-key"); + let mut current_version = None; + let mut next_value = Value::new(1); + let mut test_state = State::Write; + + info!("Starting the metadata cluster chaos test"); + + while start_chaos.elapsed() < chaos_duration { + match test_state { + State::Write => { + let result = client + .put( + key.clone(), + &next_value, + current_version + .map(Precondition::MatchesVersion) + .unwrap_or(Precondition::DoesNotExist), + ) + .await; + if result.is_err() { + test_state = State::Reconcile; + } else { + current_version = Some(next_value.version()); + next_value = Value { + value: next_value.value + 1, + version: next_value.version.next(), + }; + } + } + State::Reconcile => { + let result = client.get::(key.clone()).await; + + if let Ok(value) = result { + // assert that read value is next_value or next_value - 1 + match value { + None => { + assert_eq!(current_version, None); + assert_eq!(next_value, Value::new(1)); + } + Some(read_value) => { + let previous_value = Value { + value: next_value.value - 1, + version: next_value.version().prev(), + }; + assert!(read_value == next_value || read_value == previous_value); + + current_version = Some(read_value.version()); + next_value = Value { + value: read_value.value + 1, + version: read_value.version.next(), + }; + } + } + + test_state = State::Write; + } + } + } + } + + // make sure that we have written at least some values + assert!(next_value.value > 1); + + info!( + "Finished metadata cluster chaos test with value: {}", + next_value.value + ); + + chaos_handle.cancel(); + let mut cluster = chaos_handle.await?.into_test_result()?; + cluster.graceful_shutdown(Duration::from_secs(3)).await?; + + Ok(()) +} + +enum State { + Write, + Reconcile, +} diff --git a/tools/restatectl/src/commands/metadata/mod.rs b/tools/restatectl/src/commands/metadata/mod.rs index 9f2bc1f3a0..d0987c0db4 100644 --- a/tools/restatectl/src/commands/metadata/mod.rs +++ b/tools/restatectl/src/commands/metadata/mod.rs @@ -14,7 +14,7 @@ use std::path::PathBuf; use std::str::FromStr; use restate_core::metadata_store::MetadataStoreClient; -use restate_metadata_server::local::create_client; +use restate_metadata_server::create_client; use restate_types::config::MetadataStoreClientOptions; use restate_types::net::AdvertisedAddress; use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; diff --git a/tools/restatectl/src/environment/metadata_store.rs b/tools/restatectl/src/environment/metadata_store.rs index ff90ffbddd..fef5df7d02 100644 --- a/tools/restatectl/src/environment/metadata_store.rs +++ b/tools/restatectl/src/environment/metadata_store.rs @@ -67,7 +67,7 @@ pub async fn start_metadata_server( .wait_for_value(NodeRpcStatus::Ready) .await; - let client = restate_metadata_server::local::create_client(metadata_store_client_options) + let client = restate_metadata_server::create_client(metadata_store_client_options) .await .map_err(|e| anyhow::anyhow!("Failed to create metadata store client: {}", e))?;