diff --git a/iroh-p2p/src/service.rs b/iroh-p2p/src/service.rs index 7ba9b3117c..a782f57674 100644 --- a/iroh-p2p/src/service.rs +++ b/iroh-p2p/src/service.rs @@ -404,7 +404,9 @@ pub async fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBo // TODO: make transports configurable let transport = libp2p::tcp::TokioTcpConfig::new().nodelay(true); - let transport = libp2p::websocket::WsConfig::new(transport.clone()).or_transport(transport); + let transport = + libp2p::websocket::WsConfig::new(libp2p::tcp::TokioTcpConfig::new().nodelay(true)) + .or_transport(transport); let transport = libp2p::dns::TokioDnsConfig::system(transport).unwrap(); let auth_config = { let dh_keys = noise::Keypair::::new() diff --git a/iroh-rpc-client/src/client.rs b/iroh-rpc-client/src/client.rs index ad3d0ef072..0148622871 100644 --- a/iroh-rpc-client/src/client.rs +++ b/iroh-rpc-client/src/client.rs @@ -140,14 +140,22 @@ mod tests { let gateway_name = gateway::NAME; let p2p_name = network::NAME; let store_name = store::NAME; + + // make the services with the expected service names let (mut gateway_reporter, gateway_task) = - make_service(gateway_name, cfg.gateway_addr).await.unwrap(); - let (mut p2p_reporter, p2p_task) = make_service(p2p_name, cfg.p2p_addr).await.unwrap(); - let (mut store_reporter, store_task) = - make_service(store_name, cfg.store_addr).await.unwrap(); + make_service(gateway::SERVICE_NAME, cfg.gateway_addr) + .await + .unwrap(); + let (mut p2p_reporter, p2p_task) = make_service(network::SERVICE_NAME, cfg.p2p_addr) + .await + .unwrap(); + let (mut store_reporter, store_task) = make_service(store::SERVICE_NAME, cfg.store_addr) + .await + .unwrap(); let client = Client::new(&cfg).await.unwrap(); // test `check` + // expect the names to be the hard-coded display names let mut expect = StatusTable::new( StatusRow::new(gateway_name, 1, ServiceStatus::Serving), StatusRow::new(p2p_name, 1, ServiceStatus::Serving), @@ -165,10 +173,6 @@ mod tests { stream.next().await.unwrap(); got = stream.next().await.unwrap(); - // use display names that are currently hard-wired into `Client` - expect.gateway.name = "gateway"; - expect.p2p.name = "p2p"; - expect.store.name = "store"; assert_eq!(expect, got); // update gateway @@ -176,7 +180,7 @@ mod tests { .update(StatusRow::new(gateway_name, 1, ServiceStatus::Unknown)) .unwrap(); gateway_reporter - .set_service_status(gateway_name, ServingStatus::Unknown) + .set_service_status(gateway::SERVICE_NAME, ServingStatus::Unknown) .await; let got = stream.next().await.unwrap(); assert_eq!(expect, got); @@ -186,7 +190,7 @@ mod tests { .update(StatusRow::new(p2p_name, 1, ServiceStatus::NotServing)) .unwrap(); p2p_reporter - .set_service_status(p2p_name, ServingStatus::NotServing) + .set_service_status(network::SERVICE_NAME, ServingStatus::NotServing) .await; let got = stream.next().await.unwrap(); assert_eq!(expect, got); @@ -196,7 +200,7 @@ mod tests { .update(StatusRow::new(store_name, 1, ServiceStatus::Unknown)) .unwrap(); store_reporter - .set_service_status(store_name, ServingStatus::Unknown) + .set_service_status(store::SERVICE_NAME, ServingStatus::Unknown) .await; let got = stream.next().await.unwrap(); assert_eq!(expect, got); diff --git a/iroh-rpc-client/src/gateway.rs b/iroh-rpc-client/src/gateway.rs index f036445872..48d090b704 100644 --- a/iroh-rpc-client/src/gateway.rs +++ b/iroh-rpc-client/src/gateway.rs @@ -9,7 +9,10 @@ use crate::status::{self, StatusRow}; // name that the health service registers the gateway client as // this is derived from the protobuf definition of a `GatewayServer` -pub(crate) const NAME: &str = "gateway.Gateway"; +pub(crate) const SERVICE_NAME: &str = "gateway.Gateway"; + +// the display name that we expect to see in the StatusTable +pub(crate) const NAME: &str = "gateway"; #[derive(Debug, Clone)] pub struct GatewayClient { @@ -31,11 +34,11 @@ impl GatewayClient { #[tracing::instrument(skip(self))] pub async fn check(&self) -> StatusRow { - status::check(self.health.clone(), NAME).await + status::check(self.health.clone(), SERVICE_NAME, NAME).await } #[tracing::instrument(skip(self))] pub async fn watch(&self) -> impl Stream { - status::watch(self.health.clone(), NAME).await + status::watch(self.health.clone(), SERVICE_NAME, NAME).await } } diff --git a/iroh-rpc-client/src/network.rs b/iroh-rpc-client/src/network.rs index 88cb072fda..51b118f88a 100644 --- a/iroh-rpc-client/src/network.rs +++ b/iroh-rpc-client/src/network.rs @@ -17,7 +17,10 @@ use crate::status::{self, StatusRow}; // name that the health service registers the p2p client as // this is derived from the protobuf definition of a `P2pServer` -pub(crate) const NAME: &str = "p2p.P2p"; +pub(crate) const SERVICE_NAME: &str = "p2p.P2p"; + +// the display name that we expect to see in the StatusTable +pub(crate) const NAME: &str = "p2p"; #[derive(Debug, Clone)] pub struct P2pClient { @@ -118,12 +121,12 @@ impl P2pClient { #[tracing::instrument(skip(self))] pub async fn check(&self) -> StatusRow { - status::check(self.health.clone(), NAME).await + status::check(self.health.clone(), SERVICE_NAME, NAME).await } #[tracing::instrument(skip(self))] pub async fn watch(&self) -> impl Stream { - status::watch(self.health.clone(), NAME).await + status::watch(self.health.clone(), SERVICE_NAME, NAME).await } } diff --git a/iroh-rpc-client/src/status.rs b/iroh-rpc-client/src/status.rs index f34b70b4df..8f00fdf94a 100644 --- a/iroh-rpc-client/src/status.rs +++ b/iroh-rpc-client/src/status.rs @@ -1,3 +1,4 @@ +use crate::{gateway, network, store}; use anyhow::Result; use async_stream::stream; use futures::Stream; @@ -11,7 +12,11 @@ use tonic_health::proto::{ const WAIT: tokio::time::Duration = tokio::time::Duration::from_millis(1000); #[tracing::instrument(skip(health_client))] -pub async fn check(health_client: HealthClient, service: &'static str) -> StatusRow { +pub async fn check( + health_client: HealthClient, + service: &'static str, + display_name: &'static str, +) -> StatusRow { let req = iroh_metrics::req::trace_tonic_req(HealthCheckRequest { service: service.to_string(), }); @@ -20,13 +25,14 @@ pub async fn check(health_client: HealthClient, service: &'static str) Ok(res) => res.into_inner().into(), Err(s) => ServiceStatus::Down(s), }; - StatusRow::new(service, 1, status) + StatusRow::new(display_name, 1, status) } #[tracing::instrument(skip(health_client))] pub async fn watch( health_client: HealthClient, service: &'static str, + display_name: &'static str, ) -> impl Stream { stream! { loop { @@ -38,19 +44,19 @@ pub async fn watch( // loop over the stream, breaking if we get an error or stop receiving messages loop { match stream.message().await { - Ok(Some(message)) => yield StatusRow::new(service, 1, message.into()), + Ok(Some(message)) => yield StatusRow::new(display_name, 1, message.into()), Ok(None) => { - yield StatusRow::new(service, 1, ServiceStatus::Down(tonic::Status::new(tonic::Code::Unavailable, format!("No more health messages from service `{}`", service)))); + yield StatusRow::new(display_name, 1, ServiceStatus::Down(tonic::Status::new(tonic::Code::Unavailable, format!("No more health messages from service `{}`", service)))); break; } Err(status) => { - yield StatusRow::new(service, 1, ServiceStatus::Down(status)); + yield StatusRow::new(display_name, 1, ServiceStatus::Down(status)); break; } } } }, - Err(status) => yield StatusRow::new(service, 1, ServiceStatus::Down(status)), + Err(status) => yield StatusRow::new(display_name, 1, ServiceStatus::Down(status)), } /// wait before attempting to start a watch stream again tokio::time::sleep(WAIT).await; @@ -169,34 +175,29 @@ impl StatusTable { } } - pub fn update(&mut self, mut s: StatusRow) -> Result<()> { - match s.name { - crate::gateway::NAME | "gateway" => { - s.name = "gateway"; - self.gateway = s; - Ok(()) - } - crate::network::NAME | "p2p" => { - s.name = "p2p"; - self.p2p = s; - Ok(()) - } - crate::store::NAME | "store" => { - s.name = "store"; - self.store = s; - Ok(()) - } - _ => Err(anyhow::anyhow!("unknown service {}", s.name)), + pub fn update(&mut self, s: StatusRow) -> Result<()> { + if self.gateway.name() == s.name() { + self.gateway = s; + return Ok(()); + } + if self.p2p.name() == s.name() { + self.p2p = s; + return Ok(()); + } + if self.store.name() == s.name() { + self.store = s; + return Ok(()); } + Err(anyhow::anyhow!("unknown service {}", s.name)) } } impl Default for StatusTable { fn default() -> Self { Self { - gateway: StatusRow::new(crate::gateway::NAME, 1, ServiceStatus::Unknown), - p2p: StatusRow::new(crate::network::NAME, 1, ServiceStatus::Unknown), - store: StatusRow::new(crate::store::NAME, 1, ServiceStatus::Unknown), + gateway: StatusRow::new(gateway::NAME, 1, ServiceStatus::Unknown), + p2p: StatusRow::new(network::NAME, 1, ServiceStatus::Unknown), + store: StatusRow::new(store::NAME, 1, ServiceStatus::Unknown), } } } @@ -282,9 +283,9 @@ mod tests { #[test] fn status_table_update() { - let mut gateway = StatusRow::new("gateway", 1, ServiceStatus::Unknown); - let mut p2p = StatusRow::new("p2p", 1, ServiceStatus::Unknown); - let mut store = StatusRow::new("store", 1, ServiceStatus::Unknown); + let mut gateway = StatusRow::new(gateway::NAME, 1, ServiceStatus::Unknown); + let mut p2p = StatusRow::new(network::NAME, 1, ServiceStatus::Unknown); + let mut store = StatusRow::new(store::NAME, 1, ServiceStatus::Unknown); let mut got = StatusTable::new(gateway.clone(), p2p.clone(), store.clone()); store.status = ServiceStatus::Serving; diff --git a/iroh-rpc-client/src/store.rs b/iroh-rpc-client/src/store.rs index 0b7061c5a4..c1eb6027a5 100644 --- a/iroh-rpc-client/src/store.rs +++ b/iroh-rpc-client/src/store.rs @@ -19,7 +19,10 @@ pub struct StoreClient { // name that the health service registers the store client as // this is derived from the protobuf definition of a `StoreServer` -pub const NAME: &str = "store.Store"; +pub const SERVICE_NAME: &str = "store.Store"; + +// the display name that we expect to see in the StatusTable +pub(crate) const NAME: &str = "store"; impl StoreClient { pub async fn new(addr: SocketAddr) -> Result { @@ -84,11 +87,11 @@ impl StoreClient { #[tracing::instrument(skip(self))] pub async fn check(&self) -> StatusRow { - status::check(self.health.clone(), NAME).await + status::check(self.health.clone(), SERVICE_NAME, NAME).await } #[tracing::instrument(skip(self))] pub async fn watch(&self) -> impl Stream { - status::watch(self.health.clone(), NAME).await + status::watch(self.health.clone(), SERVICE_NAME, NAME).await } }