Skip to content

Commit

Permalink
Remember unresponsive nodes, refresh metadata service status concurre…
Browse files Browse the repository at this point in the history
…ntly
  • Loading branch information
pcholakov committed Feb 17, 2025
1 parent cdc4026 commit f1149a9
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 69 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/restatectl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ cling = { workspace = true }
crossterm = { version = "0.27.0" }
ctrlc = { version = "3.4" }
diff = "0.1.13"
futures = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
json-patch = "2.0.0"
Expand Down
39 changes: 30 additions & 9 deletions tools/restatectl/src/commands/metadata_server/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

use std::collections::BTreeMap;

use futures::future::join_all;
use itertools::Itertools;
use tonic::codec::CompressionEncoding;
use tonic::IntoRequest;
use tonic::{IntoRequest, Status};
use tracing::debug;

use restate_cli_util::_comfy_table::{Cell, Color, Table};
use restate_cli_util::c_println;
Expand All @@ -23,9 +25,10 @@ use restate_types::protobuf::common::MetadataServerStatus;
use restate_types::{PlainNodeId, Version};

use crate::connection::ConnectionInfo;
use crate::util::grpc_channel;

pub async fn list_metadata_servers(connection: &ConnectionInfo) -> anyhow::Result<()> {
debug!("Gathering metadata server status information");

let nodes_configuration = connection.get_nodes_configuration().await?;
let mut metadata_nodes_table = Table::new_styled();
let header = vec![
Expand All @@ -45,13 +48,31 @@ pub async fn list_metadata_servers(connection: &ConnectionInfo) -> anyhow::Resul

let mut unreachable_nodes = BTreeMap::default();

for (node_id, node_config) in nodes_configuration.iter_role(Role::MetadataServer) {
let metadata_channel = grpc_channel(node_config.address.clone());
let mut metadata_client = MetadataServerSvcClient::new(metadata_channel)
.accept_compressed(CompressionEncoding::Gzip);

let metadata_store_status = metadata_client.status(().into_request()).await;

let futures =
nodes_configuration
.iter_role(Role::MetadataServer)
.map(|(node_id, node_config)| {
let address = node_config.address.clone();
async move {
let channel = match connection.connect(&address).await {
Ok(channel) => channel,
Err(conn_error) => {
return (node_id, Err(Status::from_error(Box::new(conn_error))));
}
};

let mut metadata_client = MetadataServerSvcClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip);

debug!("Querying metadata service status on node {address}");
let metadata_store_status = metadata_client.status(().into_request()).await;

(node_id, metadata_store_status)
}
});
let results = join_all(futures).await;

for (node_id, metadata_store_status) in results {
let status = match metadata_store_status {
Ok(response) => response.into_inner(),
Err(err) => {
Expand Down
Loading

0 comments on commit f1149a9

Please sign in to comment.