From b8a4fe0b10a92ab2e2185835845a2644432c017d Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 29 Jan 2025 22:01:53 +0100 Subject: [PATCH] Add smoke tests for the Raft bassed metadata cluster --- Cargo.lock | 2 + crates/local-cluster-runner/src/node/mod.rs | 65 +++-- crates/metadata-server/src/lib.rs | 43 ++++ crates/metadata-server/src/local/tests.rs | 44 +--- crates/types/src/config/common.rs | 5 + crates/types/src/config/mod.rs | 5 + crates/types/src/version.rs | 5 + server/Cargo.toml | 4 +- server/tests/raft_metadata_cluster.rs | 265 ++++++++++++++++++++ 9 files changed, 374 insertions(+), 64 deletions(-) create mode 100644 server/tests/raft_metadata_cluster.rs diff --git a/Cargo.lock b/Cargo.lock index ce92c3792a..38eb2e4676 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7027,6 +7027,7 @@ dependencies = [ "arc-swap", "async-trait", "bytes", + "bytestring", "clap", "codederror", "derive_builder", @@ -7039,6 +7040,7 @@ dependencies = [ "hyper-util", "mock-service-endpoint", "pin-project", + "rand", "regex", "reqwest", "restate-admin", diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index 36ea155a63..46fa85f3cc 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -182,7 +182,14 @@ impl Node { let mut nodes = Vec::with_capacity(usize::try_from(size).expect("u32 to fit into usize")); base_config.common.allow_bootstrap = false; - base_config.metadata_server.kind = MetadataServerKind::Raft(RaftOptions::default()); + base_config.common.log_disable_ansi_codes = true; + if !matches!( + base_config.metadata_server.kind, + MetadataServerKind::Raft(_) + ) { + info!("Setting the metadata server to embedded"); + base_config.metadata_server.kind = MetadataServerKind::Raft(RaftOptions::default()); + } for node_id in 1..=size { let mut effective_config = base_config.clone(); @@ -295,7 +302,7 @@ impl Node { inherit_env, env, searcher, - } = self; + } = &self; let node_base_dir = std::path::absolute( base_config @@ -332,7 +339,7 @@ impl Node { .await .map_err(NodeStartError::CreateLog)?; - let binary_path: OsString = binary_source.try_into()?; + let binary_path: OsString = binary_source.clone().try_into()?; let mut cmd = Command::new(&binary_path); if !inherit_env { @@ -342,13 +349,13 @@ impl Node { } .env("RESTATE_CONFIG", node_config_file) .env("DO_NOT_TRACK", "true") // avoid sending telemetry as part of tests - .envs(env) + .envs(env.clone()) .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .kill_on_drop(true) .process_group(0) // avoid terminal control C being propagated - .args(&args); + .args(args); let mut child = cmd.spawn().map_err(NodeStartError::SpawnError)?; let pid = child.id().expect("child to have a pid"); @@ -429,10 +436,10 @@ impl Node { log_file: node_log_filename, status: StartedNodeStatus::Running { child_handle, - searcher, + searcher: searcher.clone(), pid, }, - config: base_config, + node: Some(self), }) } @@ -518,7 +525,7 @@ impl TryInto for BinarySource { pub struct StartedNode { log_file: PathBuf, status: StartedNodeStatus, - config: Configuration, + node: Option, } enum StartedNodeStatus { @@ -561,18 +568,17 @@ impl StartedNode { StartedNodeStatus::Exited(status) => Ok(status), StartedNodeStatus::Failed(kind) => Err(kind.into()), StartedNodeStatus::Running { pid, .. } => { - info!( - "Sending SIGKILL to node {} (pid {})", - self.config.node_name(), - pid - ); + info!("Sending SIGKILL to node {} (pid {})", self.node_name(), pid); match nix::sys::signal::kill( nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")), nix::sys::signal::SIGKILL, ) { Ok(()) => (&mut self.status).await, Err(errno) => match errno { - nix::errno::Errno::ESRCH => Ok(ExitStatus::default()), // ignore "no such process" + nix::errno::Errno::ESRCH => { + self.status = StartedNodeStatus::Exited(ExitStatus::default()); + Ok(ExitStatus::default()) + } // ignore "no such process" _ => Err(io::Error::from_raw_os_error(errno as i32)), }, } @@ -586,11 +592,7 @@ impl StartedNode { StartedNodeStatus::Exited(_) => Ok(()), StartedNodeStatus::Failed(kind) => Err(kind.into()), StartedNodeStatus::Running { pid, .. } => { - info!( - "Sending SIGTERM to node {} (pid {})", - self.config.node_name(), - pid - ); + info!("Sending SIGTERM to node {} (pid {})", self.node_name(), pid); match nix::sys::signal::kill( nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")), nix::sys::signal::SIGTERM, @@ -598,7 +600,7 @@ impl StartedNode { Err(nix::errno::Errno::ESRCH) => { warn!( "Node {} server process (pid {}) did not exist when sending SIGTERM", - self.config.node_name(), + self.node_name(), pid ); Ok(()) @@ -610,6 +612,17 @@ impl StartedNode { } } + pub async fn restart(&mut self) -> anyhow::Result<()> { + info!("Restarting node '{}'", self.config().node_name()); + self.kill().await?; + assert!( + !matches!(self.status, StartedNodeStatus::Running { .. }), + "Node should not be in status running after killing it." + ); + *self = self.node.take().expect("to be present").start().await?; + Ok(()) + } + /// Send a SIGTERM, then wait for `dur` for exit, otherwise send a SIGKILL pub async fn graceful_shutdown(&mut self, dur: Duration) -> io::Result { match self.status { @@ -651,7 +664,7 @@ impl StartedNode { } pub fn config(&self) -> &Configuration { - &self.config + &self.node.as_ref().expect("to be present").base_config } pub fn node_name(&self) -> &str { @@ -708,7 +721,7 @@ impl StartedNode { pub async fn metadata_client( &self, ) -> Result { - restate_metadata_server::create_client(self.config.common.metadata_store_client.clone()) + restate_metadata_server::create_client(self.config().common.metadata_store_client.clone()) .await } @@ -764,8 +777,8 @@ impl StartedNode { /// Check to see if the metadata server has joined the metadata cluster. pub async fn metadata_server_joined_cluster(&self) -> bool { let mut metadata_server_client = MetadataServerSvcClient::new(create_tonic_channel( - self.config.common.advertised_address.clone(), - &self.config.networking, + self.config().common.advertised_address.clone(), + &self.config().networking, )); let Ok(response) = metadata_server_client @@ -838,7 +851,7 @@ impl Drop for StartedNode { if let StartedNodeStatus::Running { pid, .. } = self.status { warn!( "Node {} (pid {}) dropped without explicit shutdown", - self.config.node_name(), + self.config().node_name(), pid, ); match nix::sys::signal::kill( @@ -848,7 +861,7 @@ impl Drop for StartedNode { Ok(()) | Err(nix::errno::Errno::ESRCH) => {} err => error!( "Failed to send SIGKILL to running node {} (pid {}): {:?}", - self.config.node_name(), + self.config().node_name(), pid, err, ), diff --git a/crates/metadata-server/src/lib.rs b/crates/metadata-server/src/lib.rs index 858ddb29b4..8e4b9b1568 100644 --- a/crates/metadata-server/src/lib.rs +++ b/crates/metadata-server/src/lib.rs @@ -755,3 +755,46 @@ fn prepare_initial_nodes_configuration( Ok(plain_node_id) } + +#[cfg(any(test, feature = "test-util"))] +pub mod tests { + use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)] + pub struct Value { + pub version: Version, + pub value: u32, + } + + impl Default for Value { + fn default() -> Self { + Self { + version: Version::MIN, + value: Default::default(), + } + } + } + + impl Value { + pub fn new(value: u32) -> Self { + Value { + value, + ..Value::default() + } + } + + pub fn next_version(mut self) -> Self { + self.version = self.version.next(); + self + } + } + + impl Versioned for Value { + fn version(&self) -> Version { + self.version + } + } + + flexbuffers_storage_encode_decode!(Value); +} diff --git a/crates/metadata-server/src/local/tests.rs b/crates/metadata-server/src/local/tests.rs index bead19d6a9..debfc27376 100644 --- a/crates/metadata-server/src/local/tests.rs +++ b/crates/metadata-server/src/local/tests.rs @@ -11,7 +11,6 @@ use bytestring::ByteString; use futures::stream::FuturesUnordered; use futures::StreamExt; -use serde::{Deserialize, Serialize}; use test_log::test; use restate_core::network::{FailingConnector, NetworkServerBuilder}; @@ -25,42 +24,13 @@ use restate_types::health::HealthStatus; use restate_types::live::{BoxedLiveLoad, Live}; use restate_types::net::{AdvertisedAddress, BindAddress}; use restate_types::protobuf::common::NodeRpcStatus; -use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; +use restate_types::{Version, Versioned}; use crate::grpc::client::GrpcMetadataServerClient; use crate::local::LocalMetadataServer; +use crate::tests::Value; use crate::{MetadataServer, MetadataServerRunner, MetadataStoreClient, Precondition, WriteError}; -#[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)] -struct Value { - version: Version, - value: String, -} - -impl Default for Value { - fn default() -> Self { - Self { - version: Version::MIN, - value: Default::default(), - } - } -} - -impl Value { - fn next_version(mut self) -> Self { - self.version = self.version.next(); - self - } -} - -impl Versioned for Value { - fn version(&self) -> Version { - self.version - } -} - -flexbuffers_storage_encode_decode!(Value); - /// Tests basic operations of the metadata store. #[test(restate_core::test(flavor = "multi_thread", worker_threads = 2))] async fn basic_metadata_store_operations() -> anyhow::Result<()> { @@ -69,17 +39,17 @@ async fn basic_metadata_store_operations() -> anyhow::Result<()> { let key: ByteString = "key".into(); let value = Value { version: Version::MIN, - value: "test_value".to_owned(), + value: 1, }; let next_value = Value { version: Version::from(2), - value: "next_value".to_owned(), + value: 2, }; let other_value = Value { version: Version::MIN, - value: "other_value".to_owned(), + value: 3, }; // first get should be empty @@ -227,7 +197,7 @@ async fn durable_storage() -> anyhow::Result<()> { metadata_key, &Value { version: Version::from(key), - value, + value: key, }, Precondition::DoesNotExist, ) @@ -258,7 +228,7 @@ async fn durable_storage() -> anyhow::Result<()> { client.get(metadata_key).await?, Some(Value { version: Version::from(key), - value + value: key, }) ); } diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 1f692093f9..753164400e 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -322,6 +322,11 @@ impl CommonOptions { }) } + #[cfg(any(test, feature = "test-util"))] + pub fn base_dir_opt(&self) -> Option<&PathBuf> { + self.base_dir.as_ref() + } + pub fn rocksdb_actual_total_memtables_size(&self) -> usize { let sanitized = self.rocksdb_total_memtables_ratio.clamp(0.0, 1.0) as f64; let total_mem = self.rocksdb_total_memory_size.get() as f64; diff --git a/crates/types/src/config/mod.rs b/crates/types/src/config/mod.rs index 1b384593b5..2bfe3009a3 100644 --- a/crates/types/src/config/mod.rs +++ b/crates/types/src/config/mod.rs @@ -139,6 +139,11 @@ pub fn reset_base_temp_dir_and_retain() -> PathBuf { pub fn set_current_config(config: Configuration) { #[cfg(not(any(test, feature = "test-util")))] let proposed_cwd = config.common.base_dir().join(config.node_name()); + #[cfg(any(test, feature = "test-util"))] + if let Some(base_dir) = config.common.base_dir_opt() { + // overwrite temp directory if an explicit base dir was configured + set_base_temp_dir(base_dir.clone().join(config.node_name())); + } // todo: potentially validate the config CONFIGURATION.store(Arc::new(config)); #[cfg(not(any(test, feature = "test-util")))] diff --git a/crates/types/src/version.rs b/crates/types/src/version.rs index cc44dcfb26..42a8d7edb2 100644 --- a/crates/types/src/version.rs +++ b/crates/types/src/version.rs @@ -35,6 +35,11 @@ impl Version { pub fn next(self) -> Self { Version(self.0 + 1) } + + #[cfg(feature = "test-util")] + pub fn prev(self) -> Self { + Version(self.0.saturating_sub(1)) + } } impl From for Version { diff --git a/server/Cargo.toml b/server/Cargo.toml index ba04ce38b3..a010ee4d9c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -73,7 +73,7 @@ restate-admin = { workspace = true, features = ["memory-loglet", "clients"] } restate-bifrost = { workspace = true, features = ["test-util"] } restate-core = { workspace = true, features = ["test-util"] } restate-local-cluster-runner = { workspace = true } -restate-metadata-server = { workspace = true } +restate-metadata-server = { workspace = true, features = ["test-util"] } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } mock-service-endpoint = { workspace = true } @@ -81,6 +81,7 @@ mock-service-endpoint = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +bytestring = { workspace = true} googletest = { workspace = true } hyper-util = { workspace = true } tempfile = { workspace = true } @@ -88,6 +89,7 @@ test-log = { workspace = true } tonic = { workspace = true, features = ["transport", "prost"] } tower = { workspace = true } tracing-subscriber = { workspace = true } +rand = { workspace = true } reqwest = { workspace = true } serde_json = { workspace = true } url = { workspace = true } diff --git a/server/tests/raft_metadata_cluster.rs b/server/tests/raft_metadata_cluster.rs new file mode 100644 index 0000000000..73922726d4 --- /dev/null +++ b/server/tests/raft_metadata_cluster.rs @@ -0,0 +1,265 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use bytestring::ByteString; +use enumset::EnumSet; +use googletest::prelude::err; +use googletest::{assert_that, pat, IntoTestResult}; +use rand::prelude::SliceRandom; +use restate_core::metadata_store::{Precondition, WriteError}; +use restate_core::{cancellation_watcher, TaskCenter, TaskKind}; +use restate_local_cluster_runner::cluster::Cluster; +use restate_local_cluster_runner::node::{BinarySource, HealthCheck, Node}; +use restate_metadata_server::create_client; +use restate_metadata_server::tests::Value; +use restate_types::config::{ + Configuration, MetadataServerKind, MetadataStoreClient, MetadataStoreClientOptions, RaftOptions, +}; +use restate_types::Versioned; +use std::time::{Duration, Instant}; +use tracing::info; + +#[test_log::test(restate_core::test)] +async fn raft_metadata_cluster_smoke_test() -> googletest::Result<()> { + let base_config = Configuration::default(); + + let nodes = Node::new_test_nodes( + base_config, + BinarySource::CargoTest, + EnumSet::empty(), + 3, + true, + ); + let mut cluster = Cluster::builder() + .cluster_name("raft_metadata_cluster_smoke_test") + .nodes(nodes) + .temp_base_dir() + .build() + .start() + .await?; + + cluster.wait_healthy(Duration::from_secs(30)).await?; + + let addresses = cluster + .nodes + .iter() + .map(|node| node.node_address().clone()) + .collect(); + + let metadata_store_client_options = MetadataStoreClientOptions { + metadata_store_client: MetadataStoreClient::Embedded { addresses }, + ..MetadataStoreClientOptions::default() + }; + let client = create_client(metadata_store_client_options) + .await + .expect("to not fail"); + + let value = Value::new(42); + let value_version = value.version(); + let key = ByteString::from_static("my-key"); + client + .put(key.clone(), &value, Precondition::DoesNotExist) + .await?; + + let stored_value = client.get::(key.clone()).await?; + assert_eq!(stored_value, Some(value)); + + let stored_value_version = client.get_version(key.clone()).await?; + assert_eq!(stored_value_version, Some(value_version)); + + let new_value = Value::new(1337); + let new_value_version = new_value.version(); + let failed_precondition = client + .put( + key.clone(), + &new_value, + Precondition::MatchesVersion(value_version.next()), + ) + .await; + assert_that!( + failed_precondition, + err(pat!(WriteError::FailedPrecondition(_))) + ); + + client + .put( + key.clone(), + &new_value, + Precondition::MatchesVersion(value_version), + ) + .await?; + let stored_new_value = client.get::(key.clone()).await?; + assert_eq!(stored_new_value, Some(new_value)); + + client + .delete(key.clone(), Precondition::MatchesVersion(new_value_version)) + .await?; + assert!(client.get::(key.clone()).await?.is_none()); + + cluster.graceful_shutdown(Duration::from_secs(3)).await?; + + Ok(()) +} + +#[test_log::test(restate_core::test)] +async fn raft_metadata_cluster_chaos_test() -> googletest::Result<()> { + let num_nodes = 3; + let chaos_duration = Duration::from_secs(20); + let mut base_config = Configuration::default(); + base_config.metadata_server.kind = MetadataServerKind::Raft(RaftOptions { + raft_election_tick: 5, + raft_heartbeat_tick: 2, + ..RaftOptions::default() + }); + + let nodes = Node::new_test_nodes( + base_config, + BinarySource::CargoTest, + EnumSet::empty(), + num_nodes, + true, + ); + let mut cluster = Cluster::builder() + .cluster_name("raft_metadata_cluster_smoke_test") + .nodes(nodes) + .temp_base_dir() + .build() + .start() + .await?; + + cluster.wait_healthy(Duration::from_secs(30)).await?; + + let addresses = cluster + .nodes + .iter() + .map(|node| node.node_address().clone()) + .collect(); + + let metadata_store_client_options = MetadataStoreClientOptions { + metadata_store_client: MetadataStoreClient::Embedded { addresses }, + ..MetadataStoreClientOptions::default() + }; + let client = create_client(metadata_store_client_options) + .await + .expect("to not fail"); + + let start_chaos = Instant::now(); + + let chaos_handle = TaskCenter::spawn_unmanaged(TaskKind::Background, "chaos", async move { + let mut shutdown = std::pin::pin!(cancellation_watcher()); + + loop { + let node = cluster + .nodes + .choose_mut(&mut rand::thread_rng()) + .expect("at least one node being present"); + + tokio::select! { + _ = &mut shutdown => { + break; + }, + result = node.restart() => { + result?; + // wait until the cluster is healthy again + tokio::select! { + _ = &mut shutdown => { + break; + } + result = cluster.wait_check_healthy(HealthCheck::MetadataServer, Duration::from_secs(10)) => { + result?; + } + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + + Ok::<_, anyhow::Error>(cluster) + })?; + + let key = ByteString::from_static("my-key"); + let mut current_version = None; + let mut next_value = Value::new(1); + let mut test_state = State::Write; + + info!("Starting the metadata cluster chaos test"); + + while start_chaos.elapsed() < chaos_duration { + match test_state { + State::Write => { + let result = client + .put( + key.clone(), + &next_value, + current_version + .map(Precondition::MatchesVersion) + .unwrap_or(Precondition::DoesNotExist), + ) + .await; + if result.is_err() { + test_state = State::Reconcile; + } else { + current_version = Some(next_value.version()); + next_value = Value { + value: next_value.value + 1, + version: next_value.version.next(), + }; + } + } + State::Reconcile => { + let result = client.get::(key.clone()).await; + + if let Ok(value) = result { + // assert that read value is next_value or next_value - 1 + match value { + None => { + assert_eq!(current_version, None); + assert_eq!(next_value, Value::new(1)); + } + Some(read_value) => { + let previous_value = Value { + value: next_value.value - 1, + version: next_value.version().prev(), + }; + assert!(read_value == next_value || read_value == previous_value); + + current_version = Some(read_value.version()); + next_value = Value { + value: read_value.value + 1, + version: read_value.version.next(), + }; + } + } + + test_state = State::Write; + } + } + } + } + + // make sure that we have written at least some values + assert!(next_value.value > 1); + + info!( + "Finished metadata cluster chaos test with value: {}", + next_value.value + ); + + chaos_handle.cancel(); + let mut cluster = chaos_handle.await?.into_test_result()?; + cluster.graceful_shutdown(Duration::from_secs(3)).await?; + + Ok(()) +} + +enum State { + Write, + Reconcile, +}