From c352f0e97be14252eddc188acd0ab5883b6b6844 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 17 Feb 2025 09:13:33 +0200 Subject: [PATCH 1/5] Fix typo in reconfigure --- tools/restatectl/src/commands/log/reconfigure.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/restatectl/src/commands/log/reconfigure.rs b/tools/restatectl/src/commands/log/reconfigure.rs index 6157ffa3c..328ea00c8 100644 --- a/tools/restatectl/src/commands/log/reconfigure.rs +++ b/tools/restatectl/src/commands/log/reconfigure.rs @@ -157,7 +157,7 @@ async fn inner_reconfigure( c_println!(" ├ Tail LSN: {}", sealed_segment.tail_offset); c_println!(" ├ Provider: {}", sealed_segment.provider); let Ok(provider) = ProviderKind::from_str(&sealed_segment.provider, true) else { - c_eprintln!("Unkown provider type '{}'", sealed_segment.provider); + c_eprintln!("Unknown provider type '{}'", sealed_segment.provider); return Ok(()); }; @@ -200,7 +200,7 @@ fn replicated_loglet_params( ReplicatedLogletParams { loglet_id, nodeset: if opts.nodeset.is_empty() { - anyhow::bail!("Missing nodeset. Nodeset is required if last segment is not of replicated type"); + bail!("Missing nodeset. Nodeset is required if last segment is not of replicated type"); } else { NodeSet::from_iter(opts.nodeset.iter().cloned()) }, From cdc4026186e58afcb8a87ece97a5a84331bae864 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 17 Feb 2025 11:28:34 +0200 Subject: [PATCH 2/5] Update headings for logs and metadata list --- .../restatectl/src/commands/log/list_logs.rs | 48 ++++++++++--------- .../src/commands/metadata_server/status.rs | 1 + 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/tools/restatectl/src/commands/log/list_logs.rs b/tools/restatectl/src/commands/log/list_logs.rs index 53eae4640..82db7231d 100644 --- a/tools/restatectl/src/commands/log/list_logs.rs +++ b/tools/restatectl/src/commands/log/list_logs.rs @@ -34,7 +34,7 @@ pub async fn list_logs(connection: &ConnectionInfo, _opts: &ListLogsOpts) -> any let mut logs_table = Table::new_styled(); - c_println!("Log chain {}", logs.version()); + c_println!("Logs {}", logs.version()); write_default_provider( &mut Console::stdout(), @@ -45,29 +45,33 @@ pub async fn list_logs(connection: &ConnectionInfo, _opts: &ListLogsOpts) -> any // sort by log-id for display let logs: BTreeMap = logs.iter().map(|(id, chain)| (*id, chain)).collect(); - for (log_id, chain) in logs { - let params = deserialize_replicated_log_params(&chain.tail()); - logs_table.add_row(vec![ - Cell::new(log_id), - Cell::new(format!("{}", &chain.tail().base_lsn)), - Cell::new(format!("{:?}", chain.tail().config.kind)), - render_loglet_params(¶ms, |p| Cell::new(p.loglet_id)), - render_loglet_params(¶ms, |p| Cell::new(format!("{:#}", p.replication))), - render_loglet_params(¶ms, |p| Cell::new(format!("{:#}", p.sequencer))), - render_loglet_params(¶ms, |p| Cell::new(format!("{:#}", p.nodeset))), + if logs.is_empty() { + c_println!("No logs found. Has the cluster been provisioned yet?"); + } else { + for (log_id, chain) in logs { + let params = deserialize_replicated_log_params(&chain.tail()); + logs_table.add_row(vec![ + Cell::new(log_id), + Cell::new(format!("{}", &chain.tail().base_lsn)), + Cell::new(format!("{:?}", chain.tail().config.kind)), + render_loglet_params(¶ms, |p| Cell::new(p.loglet_id)), + render_loglet_params(¶ms, |p| Cell::new(format!("{:#}", p.replication))), + render_loglet_params(¶ms, |p| Cell::new(format!("{:#}", p.sequencer))), + render_loglet_params(¶ms, |p| Cell::new(format!("{:#}", p.nodeset))), + ]); + } + + logs_table.set_styled_header(vec![ + "L-ID", + "FROM-LSN", + "KIND", + "LOGLET-ID", + "REPLICATION", + "SEQUENCER", + "NODESET", ]); + c_println!("{}", logs_table); } - logs_table.set_styled_header(vec![ - "L-ID", - "FROM-LSN", - "KIND", - "LOGLET-ID", - "REPLICATION", - "SEQUENCER", - "NODESET", - ]); - c_println!("{}", logs_table); - Ok(()) } diff --git a/tools/restatectl/src/commands/metadata_server/status.rs b/tools/restatectl/src/commands/metadata_server/status.rs index caa087b87..cbdc37546 100644 --- a/tools/restatectl/src/commands/metadata_server/status.rs +++ b/tools/restatectl/src/commands/metadata_server/status.rs @@ -119,6 +119,7 @@ pub async fn list_metadata_servers(connection: &ConnectionInfo) -> anyhow::Resul ]); } + c_println!("Metadata service"); c_println!("{}", metadata_nodes_table); if !unreachable_nodes.is_empty() { From f1149a9cc5f787fa57def150fded6c9b8a46a443 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 17 Feb 2025 11:35:47 +0200 Subject: [PATCH 3/5] Remember unresponsive nodes, refresh metadata service status concurrently --- Cargo.lock | 1 + tools/restatectl/Cargo.toml | 1 + .../src/commands/metadata_server/status.rs | 39 +++- tools/restatectl/src/connection.rs | 194 ++++++++++++------ 4 files changed, 166 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b286ecd21..5eb971a78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7627,6 +7627,7 @@ dependencies = [ "crossterm 0.27.0", "ctrlc", "diff", + "futures", "futures-util", "itertools 0.14.0", "json-patch", diff --git a/tools/restatectl/Cargo.toml b/tools/restatectl/Cargo.toml index 0aef6f275..bf53d943a 100644 --- a/tools/restatectl/Cargo.toml +++ b/tools/restatectl/Cargo.toml @@ -44,6 +44,7 @@ cling = { workspace = true } crossterm = { version = "0.27.0" } ctrlc = { version = "3.4" } diff = "0.1.13" +futures = { workspace = true } futures-util = { workspace = true } itertools = { workspace = true } json-patch = "2.0.0" diff --git a/tools/restatectl/src/commands/metadata_server/status.rs b/tools/restatectl/src/commands/metadata_server/status.rs index cbdc37546..5de225f73 100644 --- a/tools/restatectl/src/commands/metadata_server/status.rs +++ b/tools/restatectl/src/commands/metadata_server/status.rs @@ -10,9 +10,11 @@ use std::collections::BTreeMap; +use futures::future::join_all; use itertools::Itertools; use tonic::codec::CompressionEncoding; -use tonic::IntoRequest; +use tonic::{IntoRequest, Status}; +use tracing::debug; use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; @@ -23,9 +25,10 @@ use restate_types::protobuf::common::MetadataServerStatus; use restate_types::{PlainNodeId, Version}; use crate::connection::ConnectionInfo; -use crate::util::grpc_channel; pub async fn list_metadata_servers(connection: &ConnectionInfo) -> anyhow::Result<()> { + debug!("Gathering metadata server status information"); + let nodes_configuration = connection.get_nodes_configuration().await?; let mut metadata_nodes_table = Table::new_styled(); let header = vec![ @@ -45,13 +48,31 @@ pub async fn list_metadata_servers(connection: &ConnectionInfo) -> anyhow::Resul let mut unreachable_nodes = BTreeMap::default(); - for (node_id, node_config) in nodes_configuration.iter_role(Role::MetadataServer) { - let metadata_channel = grpc_channel(node_config.address.clone()); - let mut metadata_client = MetadataServerSvcClient::new(metadata_channel) - .accept_compressed(CompressionEncoding::Gzip); - - let metadata_store_status = metadata_client.status(().into_request()).await; - + let futures = + nodes_configuration + .iter_role(Role::MetadataServer) + .map(|(node_id, node_config)| { + let address = node_config.address.clone(); + async move { + let channel = match connection.connect(&address).await { + Ok(channel) => channel, + Err(conn_error) => { + return (node_id, Err(Status::from_error(Box::new(conn_error)))); + } + }; + + let mut metadata_client = MetadataServerSvcClient::new(channel) + .accept_compressed(CompressionEncoding::Gzip); + + debug!("Querying metadata service status on node {address}"); + let metadata_store_status = metadata_client.status(().into_request()).await; + + (node_id, metadata_store_status) + } + }); + let results = join_all(futures).await; + + for (node_id, metadata_store_status) in results { let status = match metadata_store_status { Ok(response) => response.into_inner(), Err(err) => { diff --git a/tools/restatectl/src/connection.rs b/tools/restatectl/src/connection.rs index 52b10dd5b..9e950d563 100644 --- a/tools/restatectl/src/connection.rs +++ b/tools/restatectl/src/connection.rs @@ -8,14 +8,16 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::{cmp::Ordering, collections::HashMap, fmt::Display, future::Future, sync::Arc}; +use std::collections::{HashMap, HashSet}; +use std::sync::RwLock; +use std::{cmp::Ordering, fmt::Display, future::Future, sync::Arc}; use cling::{prelude::Parser, Collect}; use itertools::{Either, Itertools}; use rand::{rng, seq::SliceRandom}; use tokio::sync::{Mutex, MutexGuard}; -use tonic::{codec::CompressionEncoding, transport::Channel, Response, Status}; -use tracing::info; +use tonic::{codec::CompressionEncoding, transport::Channel, Code, Response, Status}; +use tracing::{debug, info}; use restate_core::protobuf::node_ctl_svc::{ node_ctl_svc_client::NodeCtlSvcClient, GetMetadataRequest, IdentResponse, @@ -33,10 +35,12 @@ use crate::util::grpc_channel; #[derive(Clone, Parser, Collect, Debug)] pub struct ConnectionInfo { - /// Specify server address to connect to. + /// Specify one or more server addresses to connect to. /// - /// It needs access to the node-to-node address (aka. node advertised address) - /// Can also accept a comma-separated list or by repeating `--address=`. + /// Needs access to the node-to-node address (aka node advertised address). + /// Specify multiple addresses as a comma-separated list, or pass multiple + /// `--address=` arguments. Additional addresses may be discovered + /// based on the configuration of reachable nodes. #[clap( long, short('s'), @@ -50,8 +54,9 @@ pub struct ConnectionInfo { )] pub address: Vec, - /// Use this option to avoid receiving stale metadata information from the nodes by reading it - /// from the metadata store. + /// Force a reload of the metadata from the metadata store. Typically, Restate nodes hold + /// a cached view of metadata such as cluster nodes or partition configuration. Use this flag + /// to always fetch the latest as part of the request. #[arg(long)] pub sync_metadata: bool, @@ -62,21 +67,25 @@ pub struct ConnectionInfo { logs: Arc>>, #[clap(skip)] - cache: Arc>>, + open_connections: Arc>>, + + #[clap(skip)] + dead_nodes: Arc>>, } impl ConnectionInfo { - /// Gets NodesConfiguration object. This function tries all provided addresses and makes sure - /// nodes configuration is cached. + /// Gets NodesConfiguration object. Tries all provided addresses and caches the + /// response. Always uses the address seed provided on the command line. pub async fn get_nodes_configuration(&self) -> Result { if self.address.is_empty() { return Err(ConnectionInfoError::NoAvailableNodes(NoRoleError(None))); } let guard = self.nodes_configuration.lock().await; + if guard.is_some() { + debug!("Using cached nodes configuration"); + } - // get nodes configuration will always use the addresses seed - // provided via the cmdline self.get_latest_metadata( self.address.iter(), self.address.len(), @@ -104,11 +113,19 @@ impl ConnectionInfo { nodes_addresses.shuffle(&mut rng()); let cluster_size = nodes_addresses.len(); - let cached = self.cache.lock().await.keys().cloned().collect::>(); + let cached = self + .open_connections + .lock() + .await + .keys() + .cloned() + .collect::>(); assert!(!cached.is_empty(), "must have cached connections"); - let try_nodes = cached + // To improve our chance of getting the latest logs definition, we read from a simple + // majority of nodes. Existing connections take precedence. + let logs_source_nodes = cached .iter() .chain( nodes_addresses @@ -117,13 +134,8 @@ impl ConnectionInfo { ) .collect::>(); - // To be sure we landed on the best guess of the latest version of the logs - // we need to ask multiple nodes in the nodes config. - // We make sure cached nodes has higher precedence + 50% of node set size trimmed to - // a total of 50% + 1 nodes. - self.get_latest_metadata( - try_nodes.into_iter(), + logs_source_nodes.into_iter(), (cluster_size / 2) + 1, MetadataKind::Logs, guard, @@ -132,15 +144,15 @@ impl ConnectionInfo { .await } - /// Gets Metadata object. On successful responses, [`get_latest_metadata`] will only try `try_best_of` - /// of the provided addresses, or until all nodes are exhausted. + /// Gets the latest metadata value. Stops after `stop_after_responses` nodes + /// respond, otherwise keeps trying until all addresses are exhausted. async fn get_latest_metadata( &self, addresses: impl Iterator, - mut try_best_of: usize, + mut stop_after_responses: usize, kind: MetadataKind, mut guard: MutexGuard<'_, Option>, - version_map: M, + extract_version: M, ) -> Result where T: StorageDecode + Versioned + Clone, @@ -150,10 +162,10 @@ impl ConnectionInfo { return Ok(meta.clone()); } - let mut latest_meta: Option = None; - let mut answer = false; + let mut latest_value: Option = None; + let mut any_node_responded = false; let mut errors = NodesErrors::default(); - let mut cache = self.cache.lock().await; + let mut open_connections = self.open_connections.lock().await; let request = GetMetadataRequest { kind: kind.into(), @@ -161,7 +173,7 @@ impl ConnectionInfo { }; for address in addresses { - let channel = cache.entry(address.clone()).or_insert_with(|| { + let channel = open_connections.entry(address.clone()).or_insert_with(|| { info!("Connecting to {address}"); grpc_channel(address.clone()) }); @@ -174,25 +186,32 @@ impl ConnectionInfo { Ok(response) => response.into_inner(), Err(status) => { errors.error(address.clone(), status); + self.dead_nodes.write().unwrap().insert(address.clone()); continue; } }; - // node is reachable and has answered - answer = true; + any_node_responded = true; if response.status != NodeStatus::Alive as i32 { - // node did not join the cluster yet. + debug!("Node {address} responded but it is not reporting itself as alive, and will be skipped"); continue; } - if version_map(&response) - <= latest_meta + let response_version = extract_version(&response); + match response_version.cmp( + &latest_value .as_ref() .map(|c| c.version()) - .unwrap_or(Version::INVALID) - { - // has older version than we have. - continue; + .unwrap_or(Version::INVALID), + ) { + Ordering::Less => { + debug!("Node {address} returned an older version {response_version} than we currently have"); + continue; + } + Ordering::Equal => continue, + Ordering::Greater => { + debug!("Node {address} returned a newer version {response_version} than we currently have"); + } } let mut response = match client.get_metadata(request).await { @@ -207,27 +226,26 @@ impl ConnectionInfo { .map_err(|err| ConnectionInfoError::DecoderError(address.clone(), err))?; if meta.version() - > latest_meta + > latest_value .as_ref() .map(|c| c.version()) .unwrap_or(Version::INVALID) { - latest_meta = Some(meta); + latest_value = Some(meta); } - try_best_of -= 1; - if try_best_of == 0 { + stop_after_responses -= 1; + if stop_after_responses == 0 { break; } } - if !answer { - // all nodes have returned error + if !any_node_responded { return Err(ConnectionInfoError::NodesErrors(errors)); } - *guard = latest_meta.clone(); - latest_meta.ok_or(ConnectionInfoError::MissingMetadata) + *guard = latest_value.clone(); + latest_value.ok_or(ConnectionInfoError::MissingMetadata) } /// Attempts to contact each node in the cluster that matches the specified role @@ -244,14 +262,14 @@ impl ConnectionInfo { pub async fn try_each( &self, role: Option, - mut closure: F, + mut node_operation: F, ) -> Result, ConnectionInfoError> where F: FnMut(Channel) -> Fut, Fut: Future, Status>>, { let nodes_config = self.get_nodes_configuration().await?; - let mut channels = self.cache.lock().await; + let mut open_connections = self.open_connections.lock().await; let iterator = match role { Some(role) => Either::Left(nodes_config.iter_role(role)), @@ -260,8 +278,8 @@ impl ConnectionInfo { .sorted_by(|a, b| { // nodes for which we already have open channels get higher precedence. match ( - channels.contains_key(&a.1.address), - channels.contains_key(&b.1.address), + open_connections.contains_key(&a.1.address), + open_connections.contains_key(&b.1.address), ) { (true, false) => Ordering::Less, (false, true) => Ordering::Greater, @@ -272,17 +290,35 @@ impl ConnectionInfo { let mut errors = NodesErrors::default(); for (_, node) in iterator { - // avoid creating new channels on each iteration. Instead cheaply copy the channels - let channel = channels - .entry(node.address.clone()) - .or_insert_with(|| grpc_channel(node.address.clone())); - - let result = closure(channel.clone()).await; - match result { - Ok(response) => return Ok(response), - Err(status) => { - errors.error(node.address.clone(), status); + let channel = self + .connect_internal(&node.address, &mut open_connections) + .await; + + if let Some(channel) = channel { + debug!("Trying {}...", node.address); + let result = node_operation(channel).await; + match result { + Ok(response) => return Ok(response), + Err(status) => { + if status.code() == Code::Unavailable + || status.code() == Code::DeadlineExceeded + { + self.dead_nodes + .write() + .unwrap() + .insert(node.address.clone()); + } + errors.error(node.address.clone(), status); + } } + } else { + errors.error( + node.address.clone(), + Status::unavailable(format!( + "Node {} was previously flagged as unreachable, not attempting to connect", + node.address + )), + ); } } @@ -292,6 +328,41 @@ impl ConnectionInfo { Err(ConnectionInfoError::NodesErrors(errors)) } } + + pub(crate) async fn connect( + &self, + address: &AdvertisedAddress, + ) -> Result { + self.connect_internal(address, &mut self.open_connections.lock().await) + .await + .map(Ok) + .unwrap_or(Err(ConnectionInfoError::NodeUnreachable)) + } + + /// Creates and returns a (lazy) connection to the specified address, or `None` if this + /// address was previously flagged as unreachable. + async fn connect_internal( + &self, + address: &AdvertisedAddress, + open_connections: &mut MutexGuard<'_, HashMap>, + ) -> Option { + if self.dead_nodes.read().unwrap().contains(address) { + debug!( + "Node {address} was previously flagged as unreachable, not attempting to connect" + ); + return None; + }; + + Some( + open_connections + .entry(address.clone()) + .or_insert_with(|| { + info!("Adding new connection to {address}"); + grpc_channel(address.clone()) + }) + .clone(), + ) + } } #[derive(Debug, thiserror::Error)] @@ -307,6 +378,9 @@ pub enum ConnectionInfoError { #[error(transparent)] NoAvailableNodes(NoRoleError), + + #[error("Node is unreachable")] + NodeUnreachable, } #[derive(Debug, thiserror::Error)] From 0fc6cb9659a459b2ad2a0e52c9a2a62c6c3ec2fb Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 17 Feb 2025 13:56:16 +0200 Subject: [PATCH 4/5] Lower the default CLI connect timeout to 3s --- crates/cli-util/src/opts.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/cli-util/src/opts.rs b/crates/cli-util/src/opts.rs index ea6df8f26..93574066f 100644 --- a/crates/cli-util/src/opts.rs +++ b/crates/cli-util/src/opts.rs @@ -16,7 +16,7 @@ use cling::Collect; use restate_core::network::net_util::CommonClientConnectionOptions; -const DEFAULT_CONNECT_TIMEOUT: u64 = 5_000; +const DEFAULT_CONNECT_TIMEOUT: u64 = 3_000; const DEFAULT_REQUEST_TIMEOUT: u64 = 13_000; #[derive(ValueEnum, Clone, Copy, Default, PartialEq, Eq)] From f3208bfaafa584a1bdf7a150adabe26318f3dd4a Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 17 Feb 2025 14:26:34 +0200 Subject: [PATCH 5/5] Sort metadata servers by node id --- tools/restatectl/src/commands/metadata_server/status.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tools/restatectl/src/commands/metadata_server/status.rs b/tools/restatectl/src/commands/metadata_server/status.rs index 5de225f73..d5272477c 100644 --- a/tools/restatectl/src/commands/metadata_server/status.rs +++ b/tools/restatectl/src/commands/metadata_server/status.rs @@ -70,7 +70,10 @@ pub async fn list_metadata_servers(connection: &ConnectionInfo) -> anyhow::Resul (node_id, metadata_store_status) } }); - let results = join_all(futures).await; + let results = join_all(futures) + .await + .into_iter() + .collect::>(); for (node_id, metadata_store_status) in results { let status = match metadata_store_status {