Skip to content

Commit

Permalink
Add smoke tests for the Raft bassed metadata cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Jan 30, 2025
1 parent a2ddb28 commit 9446ad4
Show file tree
Hide file tree
Showing 9 changed files with 380 additions and 64 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 39 additions & 26 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,14 @@ impl Node {
let mut nodes = Vec::with_capacity(usize::try_from(size).expect("u32 to fit into usize"));

base_config.common.allow_bootstrap = false;
base_config.metadata_server.kind = MetadataServerKind::Raft(RaftOptions::default());
base_config.common.log_disable_ansi_codes = true;
if !matches!(
base_config.metadata_server.kind,
MetadataServerKind::Raft(_)
) {
info!("Setting the metadata server to embedded");
base_config.metadata_server.kind = MetadataServerKind::Raft(RaftOptions::default());
}

for node_id in 1..=size {
let mut effective_config = base_config.clone();
Expand Down Expand Up @@ -295,7 +302,7 @@ impl Node {
inherit_env,
env,
searcher,
} = self;
} = &self;

let node_base_dir = std::path::absolute(
base_config
Expand Down Expand Up @@ -332,7 +339,7 @@ impl Node {
.await
.map_err(NodeStartError::CreateLog)?;

let binary_path: OsString = binary_source.try_into()?;
let binary_path: OsString = binary_source.clone().try_into()?;
let mut cmd = Command::new(&binary_path);

if !inherit_env {
Expand All @@ -342,13 +349,13 @@ impl Node {
}
.env("RESTATE_CONFIG", node_config_file)
.env("DO_NOT_TRACK", "true") // avoid sending telemetry as part of tests
.envs(env)
.envs(env.clone())
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.process_group(0) // avoid terminal control C being propagated
.args(&args);
.args(args);

let mut child = cmd.spawn().map_err(NodeStartError::SpawnError)?;
let pid = child.id().expect("child to have a pid");
Expand Down Expand Up @@ -429,10 +436,10 @@ impl Node {
log_file: node_log_filename,
status: StartedNodeStatus::Running {
child_handle,
searcher,
searcher: searcher.clone(),
pid,
},
config: base_config,
node: Some(self),
})
}

Expand Down Expand Up @@ -518,7 +525,7 @@ impl TryInto<OsString> for BinarySource {
pub struct StartedNode {
log_file: PathBuf,
status: StartedNodeStatus,
config: Configuration,
node: Option<Node>,
}

enum StartedNodeStatus {
Expand Down Expand Up @@ -561,18 +568,17 @@ impl StartedNode {
StartedNodeStatus::Exited(status) => Ok(status),
StartedNodeStatus::Failed(kind) => Err(kind.into()),
StartedNodeStatus::Running { pid, .. } => {
info!(
"Sending SIGKILL to node {} (pid {})",
self.config.node_name(),
pid
);
info!("Sending SIGKILL to node {} (pid {})", self.node_name(), pid);
match nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")),
nix::sys::signal::SIGKILL,
) {
Ok(()) => (&mut self.status).await,
Err(errno) => match errno {
nix::errno::Errno::ESRCH => Ok(ExitStatus::default()), // ignore "no such process"
nix::errno::Errno::ESRCH => {
self.status = StartedNodeStatus::Exited(ExitStatus::default());
Ok(ExitStatus::default())
} // ignore "no such process"
_ => Err(io::Error::from_raw_os_error(errno as i32)),
},
}
Expand All @@ -586,19 +592,15 @@ impl StartedNode {
StartedNodeStatus::Exited(_) => Ok(()),
StartedNodeStatus::Failed(kind) => Err(kind.into()),
StartedNodeStatus::Running { pid, .. } => {
info!(
"Sending SIGTERM to node {} (pid {})",
self.config.node_name(),
pid
);
info!("Sending SIGTERM to node {} (pid {})", self.node_name(), pid);
match nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")),
nix::sys::signal::SIGTERM,
) {
Err(nix::errno::Errno::ESRCH) => {
warn!(
"Node {} server process (pid {}) did not exist when sending SIGTERM",
self.config.node_name(),
self.node_name(),
pid
);
Ok(())
Expand All @@ -610,6 +612,17 @@ impl StartedNode {
}
}

pub async fn restart(&mut self) -> anyhow::Result<()> {
info!("Restarting node '{}'", self.config().node_name());
self.kill().await?;
assert!(
!matches!(self.status, StartedNodeStatus::Running { .. }),
"Node should not be in status running after killing it."
);
*self = self.node.take().expect("to be present").start().await?;
Ok(())
}

/// Send a SIGTERM, then wait for `dur` for exit, otherwise send a SIGKILL
pub async fn graceful_shutdown(&mut self, dur: Duration) -> io::Result<ExitStatus> {
match self.status {
Expand Down Expand Up @@ -651,7 +664,7 @@ impl StartedNode {
}

pub fn config(&self) -> &Configuration {
&self.config
&self.node.as_ref().expect("to be present").base_config
}

pub fn node_name(&self) -> &str {
Expand Down Expand Up @@ -708,7 +721,7 @@ impl StartedNode {
pub async fn metadata_client(
&self,
) -> Result<restate_metadata_server::MetadataStoreClient, GenericError> {
restate_metadata_server::create_client(self.config.common.metadata_store_client.clone())
restate_metadata_server::create_client(self.config().common.metadata_store_client.clone())
.await
}

Expand Down Expand Up @@ -764,8 +777,8 @@ impl StartedNode {
/// Check to see if the metadata server has joined the metadata cluster.
pub async fn metadata_server_joined_cluster(&self) -> bool {
let mut metadata_server_client = MetadataServerSvcClient::new(create_tonic_channel(
self.config.common.advertised_address.clone(),
&self.config.networking,
self.config().common.advertised_address.clone(),
&self.config().networking,
));

let Ok(response) = metadata_server_client
Expand Down Expand Up @@ -838,7 +851,7 @@ impl Drop for StartedNode {
if let StartedNodeStatus::Running { pid, .. } = self.status {
warn!(
"Node {} (pid {}) dropped without explicit shutdown",
self.config.node_name(),
self.config().node_name(),
pid,
);
match nix::sys::signal::kill(
Expand All @@ -848,7 +861,7 @@ impl Drop for StartedNode {
Ok(()) | Err(nix::errno::Errno::ESRCH) => {}
err => error!(
"Failed to send SIGKILL to running node {} (pid {}): {:?}",
self.config.node_name(),
self.config().node_name(),
pid,
err,
),
Expand Down
43 changes: 43 additions & 0 deletions crates/metadata-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,3 +755,46 @@ fn prepare_initial_nodes_configuration(

Ok(plain_node_id)
}

#[cfg(any(test, feature = "test-util"))]
pub mod tests {
use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)]
pub struct Value {
pub version: Version,
pub value: u32,
}

impl Default for Value {
fn default() -> Self {
Self {
version: Version::MIN,
value: Default::default(),
}
}
}

impl Value {
pub fn new(value: u32) -> Self {
Value {
value,
..Value::default()
}
}

pub fn next_version(mut self) -> Self {
self.version = self.version.next();
self
}
}

impl Versioned for Value {
fn version(&self) -> Version {
self.version
}
}

flexbuffers_storage_encode_decode!(Value);
}
44 changes: 7 additions & 37 deletions crates/metadata-server/src/local/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use bytestring::ByteString;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use test_log::test;

use restate_core::network::{FailingConnector, NetworkServerBuilder};
Expand All @@ -25,42 +24,13 @@ use restate_types::health::HealthStatus;
use restate_types::live::{BoxedLiveLoad, Live};
use restate_types::net::{AdvertisedAddress, BindAddress};
use restate_types::protobuf::common::NodeRpcStatus;
use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned};
use restate_types::{Version, Versioned};

use crate::grpc::client::GrpcMetadataServerClient;
use crate::local::LocalMetadataServer;
use crate::tests::Value;
use crate::{MetadataServer, MetadataServerRunner, MetadataStoreClient, Precondition, WriteError};

#[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)]
struct Value {
version: Version,
value: String,
}

impl Default for Value {
fn default() -> Self {
Self {
version: Version::MIN,
value: Default::default(),
}
}
}

impl Value {
fn next_version(mut self) -> Self {
self.version = self.version.next();
self
}
}

impl Versioned for Value {
fn version(&self) -> Version {
self.version
}
}

flexbuffers_storage_encode_decode!(Value);

/// Tests basic operations of the metadata store.
#[test(restate_core::test(flavor = "multi_thread", worker_threads = 2))]
async fn basic_metadata_store_operations() -> anyhow::Result<()> {
Expand All @@ -69,17 +39,17 @@ async fn basic_metadata_store_operations() -> anyhow::Result<()> {
let key: ByteString = "key".into();
let value = Value {
version: Version::MIN,
value: "test_value".to_owned(),
value: 1,
};

let next_value = Value {
version: Version::from(2),
value: "next_value".to_owned(),
value: 2,
};

let other_value = Value {
version: Version::MIN,
value: "other_value".to_owned(),
value: 3,
};

// first get should be empty
Expand Down Expand Up @@ -227,7 +197,7 @@ async fn durable_storage() -> anyhow::Result<()> {
metadata_key,
&Value {
version: Version::from(key),
value,
value: key,
},
Precondition::DoesNotExist,
)
Expand Down Expand Up @@ -258,7 +228,7 @@ async fn durable_storage() -> anyhow::Result<()> {
client.get(metadata_key).await?,
Some(Value {
version: Version::from(key),
value
value: key,
})
);
}
Expand Down
5 changes: 5 additions & 0 deletions crates/types/src/config/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ impl CommonOptions {
})
}

#[cfg(any(test, feature = "test-util"))]
pub fn base_dir_opt(&self) -> Option<&PathBuf> {
self.base_dir.as_ref()
}

pub fn rocksdb_actual_total_memtables_size(&self) -> usize {
let sanitized = self.rocksdb_total_memtables_ratio.clamp(0.0, 1.0) as f64;
let total_mem = self.rocksdb_total_memory_size.get() as f64;
Expand Down
5 changes: 5 additions & 0 deletions crates/types/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ pub fn reset_base_temp_dir_and_retain() -> PathBuf {
pub fn set_current_config(config: Configuration) {
#[cfg(not(any(test, feature = "test-util")))]
let proposed_cwd = config.common.base_dir().join(config.node_name());
#[cfg(any(test, feature = "test-util"))]
if let Some(base_dir) = config.common.base_dir_opt() {
// overwrite temp directory if an explicit base dir was configured
set_base_temp_dir(base_dir.clone().join(config.node_name()));
}
// todo: potentially validate the config
CONFIGURATION.store(Arc::new(config));
#[cfg(not(any(test, feature = "test-util")))]
Expand Down
5 changes: 5 additions & 0 deletions crates/types/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ impl Version {
pub fn next(self) -> Self {
Version(self.0 + 1)
}

#[cfg(feature = "test-util")]
pub fn prev(self) -> Self {
Version(self.0.saturating_sub(1))
}
}

impl From<crate::protobuf::common::Version> for Version {
Expand Down
4 changes: 3 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,23 @@ restate-admin = { workspace = true, features = ["memory-loglet", "clients"] }
restate-bifrost = { workspace = true, features = ["test-util"] }
restate-core = { workspace = true, features = ["test-util"] }
restate-local-cluster-runner = { workspace = true }
restate-metadata-server = { workspace = true }
restate-metadata-server = { workspace = true, features = ["test-util"] }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
mock-service-endpoint = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true}
googletest = { workspace = true }
hyper-util = { workspace = true }
tempfile = { workspace = true }
test-log = { workspace = true }
tonic = { workspace = true, features = ["transport", "prost"] }
tower = { workspace = true }
tracing-subscriber = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
serde_json = { workspace = true }
url = { workspace = true }
Expand Down
Loading

0 comments on commit 9446ad4

Please sign in to comment.