Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add smoke tests for the Raft bassed metadata cluster #2582

Merged
merged 5 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 40 additions & 29 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,14 @@ impl Node {
let mut nodes = Vec::with_capacity(usize::try_from(size).expect("u32 to fit into usize"));

base_config.common.allow_bootstrap = false;
base_config.metadata_server.kind = MetadataServerKind::Raft(RaftOptions::default());
base_config.common.log_disable_ansi_codes = true;
if !matches!(
base_config.metadata_server.kind,
MetadataServerKind::Raft(_)
) {
info!("Setting the metadata server to embedded");
base_config.metadata_server.kind = MetadataServerKind::Raft(RaftOptions::default());
}

for node_id in 1..=size {
let mut effective_config = base_config.clone();
Expand Down Expand Up @@ -295,7 +302,7 @@ impl Node {
inherit_env,
env,
searcher,
} = self;
} = &self;

let node_base_dir = std::path::absolute(
base_config
Expand Down Expand Up @@ -332,7 +339,7 @@ impl Node {
.await
.map_err(NodeStartError::CreateLog)?;

let binary_path: OsString = binary_source.try_into()?;
let binary_path: OsString = binary_source.clone().try_into()?;
let mut cmd = Command::new(&binary_path);

if !inherit_env {
Expand All @@ -342,13 +349,13 @@ impl Node {
}
.env("RESTATE_CONFIG", node_config_file)
.env("DO_NOT_TRACK", "true") // avoid sending telemetry as part of tests
.envs(env)
.envs(env.clone())
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.process_group(0) // avoid terminal control C being propagated
.args(&args);
.args(args);

let mut child = cmd.spawn().map_err(NodeStartError::SpawnError)?;
let pid = child.id().expect("child to have a pid");
Expand Down Expand Up @@ -429,10 +436,10 @@ impl Node {
log_file: node_log_filename,
status: StartedNodeStatus::Running {
child_handle,
searcher,
searcher: searcher.clone(),
pid,
},
config: base_config,
node: Some(self),
})
}

Expand Down Expand Up @@ -518,7 +525,7 @@ impl TryInto<OsString> for BinarySource {
pub struct StartedNode {
log_file: PathBuf,
status: StartedNodeStatus,
config: Configuration,
node: Option<Node>,
}

enum StartedNodeStatus {
Expand Down Expand Up @@ -561,18 +568,17 @@ impl StartedNode {
StartedNodeStatus::Exited(status) => Ok(status),
StartedNodeStatus::Failed(kind) => Err(kind.into()),
StartedNodeStatus::Running { pid, .. } => {
info!(
"Sending SIGKILL to node {} (pid {})",
self.config.node_name(),
pid
);
info!("Sending SIGKILL to node {} (pid {})", self.node_name(), pid);
match nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")),
nix::sys::signal::SIGKILL,
) {
Ok(()) => (&mut self.status).await,
Err(errno) => match errno {
nix::errno::Errno::ESRCH => Ok(ExitStatus::default()), // ignore "no such process"
nix::errno::Errno::ESRCH => {
self.status = StartedNodeStatus::Exited(ExitStatus::default());
Ok(ExitStatus::default())
} // ignore "no such process"
_ => Err(io::Error::from_raw_os_error(errno as i32)),
},
}
Expand All @@ -586,19 +592,15 @@ impl StartedNode {
StartedNodeStatus::Exited(_) => Ok(()),
StartedNodeStatus::Failed(kind) => Err(kind.into()),
StartedNodeStatus::Running { pid, .. } => {
info!(
"Sending SIGTERM to node {} (pid {})",
self.config.node_name(),
pid
);
info!("Sending SIGTERM to node {} (pid {})", self.node_name(), pid);
match nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")),
nix::sys::signal::SIGTERM,
) {
Err(nix::errno::Errno::ESRCH) => {
warn!(
"Node {} server process (pid {}) did not exist when sending SIGTERM",
self.config.node_name(),
self.node_name(),
pid
);
Ok(())
Expand All @@ -610,6 +612,17 @@ impl StartedNode {
}
}

pub async fn restart(&mut self) -> anyhow::Result<()> {
info!("Restarting node '{}'", self.config().node_name());
self.kill().await?;
assert!(
!matches!(self.status, StartedNodeStatus::Running { .. }),
"Node should not be in status running after killing it."
);
*self = self.node.take().expect("to be present").start().await?;
Ok(())
}

/// Send a SIGTERM, then wait for `dur` for exit, otherwise send a SIGKILL
pub async fn graceful_shutdown(&mut self, dur: Duration) -> io::Result<ExitStatus> {
match self.status {
Expand Down Expand Up @@ -651,7 +664,7 @@ impl StartedNode {
}

pub fn config(&self) -> &Configuration {
&self.config
&self.node.as_ref().expect("to be present").base_config
}

pub fn node_name(&self) -> &str {
Expand Down Expand Up @@ -708,10 +721,8 @@ impl StartedNode {
pub async fn metadata_client(
&self,
) -> Result<restate_metadata_server::MetadataStoreClient, GenericError> {
restate_metadata_server::local::create_client(
self.config.common.metadata_store_client.clone(),
)
.await
restate_metadata_server::create_client(self.config().common.metadata_store_client.clone())
.await
}

/// Check to see if the admin address is healthy. Returns false if this node has no admin role.
Expand Down Expand Up @@ -766,8 +777,8 @@ impl StartedNode {
/// Check to see if the metadata server has joined the metadata cluster.
pub async fn metadata_server_joined_cluster(&self) -> bool {
let mut metadata_server_client = MetadataServerSvcClient::new(create_tonic_channel(
self.config.common.advertised_address.clone(),
&self.config.networking,
self.config().common.advertised_address.clone(),
&self.config().networking,
));

let Ok(response) = metadata_server_client
Expand Down Expand Up @@ -840,7 +851,7 @@ impl Drop for StartedNode {
if let StartedNodeStatus::Running { pid, .. } = self.status {
warn!(
"Node {} (pid {}) dropped without explicit shutdown",
self.config.node_name(),
self.config().node_name(),
pid,
);
match nix::sys::signal::kill(
Expand All @@ -850,7 +861,7 @@ impl Drop for StartedNode {
Ok(()) | Err(nix::errno::Errno::ESRCH) => {}
err => error!(
"Failed to send SIGKILL to running node {} (pid {}): {:?}",
self.config.node_name(),
self.config().node_name(),
pid,
err,
),
Expand Down
82 changes: 79 additions & 3 deletions crates/metadata-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

pub mod grpc;
pub mod local;
mod network;
pub mod raft;
mod util;

use crate::grpc::client::GrpcMetadataServerClient;
use crate::grpc::handler::MetadataStoreHandler;
use crate::grpc::metadata_server_svc_server::MetadataServerSvcServer;
use assert2::let_assert;
Expand All @@ -22,14 +22,18 @@ use bytestring::ByteString;
use grpc::pb_conversions::ConversionError;
use prost::Message;
use raft_proto::eraftpb::Snapshot;
use restate_core::metadata_store::providers::{
create_object_store_based_meta_store, EtcdMetadataStore,
};
use restate_core::metadata_store::VersionedValue;
pub use restate_core::metadata_store::{
MetadataStoreClient, Precondition, ReadError, ReadModifyWriteError, WriteError,
};
use restate_core::network::NetworkServerBuilder;
use restate_core::{MetadataWriter, ShutdownError};
use restate_types::config::{
Configuration, MetadataServerKind, MetadataServerOptions, RocksDbOptions,
Configuration, MetadataServerKind, MetadataServerOptions, MetadataStoreClientOptions,
RocksDbOptions,
};
use restate_types::errors::GenericError;
use restate_types::health::HealthStatus;
Expand All @@ -40,7 +44,7 @@ use restate_types::nodes_config::{
};
use restate_types::protobuf::common::MetadataServerStatus;
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
use restate_types::{GenerationalNodeId, PlainNodeId, Version};
use restate_types::{config, GenerationalNodeId, PlainNodeId, Version};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::future::Future;
Expand Down Expand Up @@ -661,6 +665,35 @@ impl Default for MetadataServerConfiguration {
}
}

/// Creates a [`MetadataStoreClient`] for configured metadata store.
pub async fn create_client(
metadata_store_client_options: MetadataStoreClientOptions,
) -> Result<MetadataStoreClient, GenericError> {
let backoff_policy = Some(
metadata_store_client_options
.metadata_store_client_backoff_policy
.clone(),
);

let client = match metadata_store_client_options.metadata_store_client.clone() {
config::MetadataStoreClient::Embedded { addresses } => {
let inner_client =
GrpcMetadataServerClient::new(addresses, metadata_store_client_options);
MetadataStoreClient::new(inner_client, backoff_policy)
}
config::MetadataStoreClient::Etcd { addresses } => {
let store = EtcdMetadataStore::new(addresses, &metadata_store_client_options).await?;
MetadataStoreClient::new(store, backoff_policy)
}
conf @ config::MetadataStoreClient::ObjectStore { .. } => {
let store = create_object_store_based_meta_store(conf).await?;
MetadataStoreClient::new(store, backoff_policy)
}
};

Ok(client)
}

/// Ensures that the initial nodes configuration contains the current node and has the right
/// [`MetadataServerState`] set.
fn prepare_initial_nodes_configuration(
Expand Down Expand Up @@ -722,3 +755,46 @@ fn prepare_initial_nodes_configuration(

Ok(plain_node_id)
}

#[cfg(any(test, feature = "test-util"))]
pub mod tests {
use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)]
pub struct Value {
pub version: Version,
pub value: u32,
}

impl Default for Value {
fn default() -> Self {
Self {
version: Version::MIN,
value: Default::default(),
}
}
}

impl Value {
pub fn new(value: u32) -> Self {
Value {
value,
..Value::default()
}
}

pub fn next_version(mut self) -> Self {
self.version = self.version.next();
self
}
}

impl Versioned for Value {
fn version(&self) -> Version {
self.version
}
}

flexbuffers_storage_encode_decode!(Value);
}
36 changes: 0 additions & 36 deletions crates/metadata-server/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,54 +8,18 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::grpc::client::GrpcMetadataServerClient;
use restate_core::metadata_store::providers::create_object_store_based_meta_store;
use restate_core::metadata_store::{providers::EtcdMetadataStore, MetadataStoreClient};
use restate_core::network::NetworkServerBuilder;
use restate_rocksdb::RocksError;
use restate_types::config::{MetadataServerOptions, RocksDbOptions};
use restate_types::health::HealthStatus;
use restate_types::live::BoxedLiveLoad;
use restate_types::protobuf::common::MetadataServerStatus;
use restate_types::{
config::{MetadataStoreClient as MetadataStoreClientConfig, MetadataStoreClientOptions},
errors::GenericError,
};

mod store;

use crate::MetadataServerRunner;
pub use store::LocalMetadataServer;

/// Creates a [`MetadataStoreClient`] for the [`GrpcMetadataServerClient`].
pub async fn create_client(
metadata_store_client_options: MetadataStoreClientOptions,
) -> Result<MetadataStoreClient, GenericError> {
let backoff_policy = Some(
metadata_store_client_options
.metadata_store_client_backoff_policy
.clone(),
);

let client = match metadata_store_client_options.metadata_store_client.clone() {
MetadataStoreClientConfig::Embedded { addresses } => {
let inner_client =
GrpcMetadataServerClient::new(addresses, metadata_store_client_options);
MetadataStoreClient::new(inner_client, backoff_policy)
}
MetadataStoreClientConfig::Etcd { addresses } => {
let store = EtcdMetadataStore::new(addresses, &metadata_store_client_options).await?;
MetadataStoreClient::new(store, backoff_policy)
}
conf @ MetadataStoreClientConfig::ObjectStore { .. } => {
let store = create_object_store_based_meta_store(conf).await?;
MetadataStoreClient::new(store, backoff_policy)
}
};

Ok(client)
}

pub(crate) async fn create_server(
metadata_server_options: &MetadataServerOptions,
rocksdb_options: BoxedLiveLoad<RocksDbOptions>,
Expand Down
Loading
Loading