Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[restatectl] Improve performance and information display with unprovisioned clusters or under partial node connectivity #2748

Merged
merged 5 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/cli-util/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use cling::Collect;

use restate_core::network::net_util::CommonClientConnectionOptions;

const DEFAULT_CONNECT_TIMEOUT: u64 = 5_000;
const DEFAULT_CONNECT_TIMEOUT: u64 = 3_000;
const DEFAULT_REQUEST_TIMEOUT: u64 = 13_000;

#[derive(ValueEnum, Clone, Copy, Default, PartialEq, Eq)]
Expand Down
1 change: 1 addition & 0 deletions tools/restatectl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ cling = { workspace = true }
crossterm = { version = "0.27.0" }
ctrlc = { version = "3.4" }
diff = "0.1.13"
futures = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
json-patch = "2.0.0"
Expand Down
48 changes: 26 additions & 22 deletions tools/restatectl/src/commands/log/list_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn list_logs(connection: &ConnectionInfo, _opts: &ListLogsOpts) -> any

let mut logs_table = Table::new_styled();

c_println!("Log chain {}", logs.version());
c_println!("Logs {}", logs.version());

write_default_provider(
&mut Console::stdout(),
Expand All @@ -45,29 +45,33 @@ pub async fn list_logs(connection: &ConnectionInfo, _opts: &ListLogsOpts) -> any
// sort by log-id for display
let logs: BTreeMap<LogId, &Chain> = logs.iter().map(|(id, chain)| (*id, chain)).collect();

for (log_id, chain) in logs {
let params = deserialize_replicated_log_params(&chain.tail());
logs_table.add_row(vec![
Cell::new(log_id),
Cell::new(format!("{}", &chain.tail().base_lsn)),
Cell::new(format!("{:?}", chain.tail().config.kind)),
render_loglet_params(&params, |p| Cell::new(p.loglet_id)),
render_loglet_params(&params, |p| Cell::new(format!("{:#}", p.replication))),
render_loglet_params(&params, |p| Cell::new(format!("{:#}", p.sequencer))),
render_loglet_params(&params, |p| Cell::new(format!("{:#}", p.nodeset))),
if logs.is_empty() {
c_println!("No logs found. Has the cluster been provisioned yet?");
} else {
for (log_id, chain) in logs {
let params = deserialize_replicated_log_params(&chain.tail());
logs_table.add_row(vec![
Cell::new(log_id),
Cell::new(format!("{}", &chain.tail().base_lsn)),
Cell::new(format!("{:?}", chain.tail().config.kind)),
render_loglet_params(&params, |p| Cell::new(p.loglet_id)),
render_loglet_params(&params, |p| Cell::new(format!("{:#}", p.replication))),
render_loglet_params(&params, |p| Cell::new(format!("{:#}", p.sequencer))),
render_loglet_params(&params, |p| Cell::new(format!("{:#}", p.nodeset))),
]);
}

logs_table.set_styled_header(vec![
"L-ID",
"FROM-LSN",
"KIND",
"LOGLET-ID",
"REPLICATION",
"SEQUENCER",
"NODESET",
]);
c_println!("{}", logs_table);
}

logs_table.set_styled_header(vec![
"L-ID",
"FROM-LSN",
"KIND",
"LOGLET-ID",
"REPLICATION",
"SEQUENCER",
"NODESET",
]);
c_println!("{}", logs_table);

Ok(())
}
4 changes: 2 additions & 2 deletions tools/restatectl/src/commands/log/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ async fn inner_reconfigure(
c_println!(" ├ Tail LSN: {}", sealed_segment.tail_offset);
c_println!(" ├ Provider: {}", sealed_segment.provider);
let Ok(provider) = ProviderKind::from_str(&sealed_segment.provider, true) else {
c_eprintln!("Unkown provider type '{}'", sealed_segment.provider);
c_eprintln!("Unknown provider type '{}'", sealed_segment.provider);
return Ok(());
};

Expand Down Expand Up @@ -200,7 +200,7 @@ fn replicated_loglet_params(
ReplicatedLogletParams {
loglet_id,
nodeset: if opts.nodeset.is_empty() {
anyhow::bail!("Missing nodeset. Nodeset is required if last segment is not of replicated type");
bail!("Missing nodeset. Nodeset is required if last segment is not of replicated type");
} else {
NodeSet::from_iter(opts.nodeset.iter().cloned())
},
Expand Down
43 changes: 34 additions & 9 deletions tools/restatectl/src/commands/metadata_server/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

use std::collections::BTreeMap;

use futures::future::join_all;
use itertools::Itertools;
use tonic::codec::CompressionEncoding;
use tonic::IntoRequest;
use tonic::{IntoRequest, Status};
use tracing::debug;

use restate_cli_util::_comfy_table::{Cell, Color, Table};
use restate_cli_util::c_println;
Expand All @@ -23,9 +25,10 @@ use restate_types::protobuf::common::MetadataServerStatus;
use restate_types::{PlainNodeId, Version};

use crate::connection::ConnectionInfo;
use crate::util::grpc_channel;

pub async fn list_metadata_servers(connection: &ConnectionInfo) -> anyhow::Result<()> {
debug!("Gathering metadata server status information");

let nodes_configuration = connection.get_nodes_configuration().await?;
let mut metadata_nodes_table = Table::new_styled();
let header = vec![
Expand All @@ -45,13 +48,34 @@ pub async fn list_metadata_servers(connection: &ConnectionInfo) -> anyhow::Resul

let mut unreachable_nodes = BTreeMap::default();

for (node_id, node_config) in nodes_configuration.iter_role(Role::MetadataServer) {
let metadata_channel = grpc_channel(node_config.address.clone());
let mut metadata_client = MetadataServerSvcClient::new(metadata_channel)
.accept_compressed(CompressionEncoding::Gzip);

let metadata_store_status = metadata_client.status(().into_request()).await;

let futures =
nodes_configuration
.iter_role(Role::MetadataServer)
.map(|(node_id, node_config)| {
let address = node_config.address.clone();
async move {
let channel = match connection.connect(&address).await {
Ok(channel) => channel,
Err(conn_error) => {
return (node_id, Err(Status::from_error(Box::new(conn_error))));
}
};

let mut metadata_client = MetadataServerSvcClient::new(channel)
.accept_compressed(CompressionEncoding::Gzip);

debug!("Querying metadata service status on node {address}");
let metadata_store_status = metadata_client.status(().into_request()).await;

(node_id, metadata_store_status)
}
});
let results = join_all(futures)
.await
.into_iter()
.collect::<BTreeMap<_, _>>();

for (node_id, metadata_store_status) in results {
let status = match metadata_store_status {
Ok(response) => response.into_inner(),
Err(err) => {
Expand Down Expand Up @@ -119,6 +143,7 @@ pub async fn list_metadata_servers(connection: &ConnectionInfo) -> anyhow::Resul
]);
}

c_println!("Metadata service");
c_println!("{}", metadata_nodes_table);

if !unreachable_nodes.is_empty() {
Expand Down
Loading
Loading