Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check and watch should display the same service names #101

Merged
merged 1 commit into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion iroh-p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ pub async fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBo
// TODO: make transports configurable

let transport = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
let transport = libp2p::websocket::WsConfig::new(transport.clone()).or_transport(transport);
let transport =
libp2p::websocket::WsConfig::new(libp2p::tcp::TokioTcpConfig::new().nodelay(true))
.or_transport(transport);
let transport = libp2p::dns::TokioDnsConfig::system(transport).unwrap();
let auth_config = {
let dh_keys = noise::Keypair::<noise::X25519Spec>::new()
Expand Down
26 changes: 15 additions & 11 deletions iroh-rpc-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,22 @@ mod tests {
let gateway_name = gateway::NAME;
let p2p_name = network::NAME;
let store_name = store::NAME;

// make the services with the expected service names
let (mut gateway_reporter, gateway_task) =
make_service(gateway_name, cfg.gateway_addr).await.unwrap();
let (mut p2p_reporter, p2p_task) = make_service(p2p_name, cfg.p2p_addr).await.unwrap();
let (mut store_reporter, store_task) =
make_service(store_name, cfg.store_addr).await.unwrap();
make_service(gateway::SERVICE_NAME, cfg.gateway_addr)
.await
.unwrap();
let (mut p2p_reporter, p2p_task) = make_service(network::SERVICE_NAME, cfg.p2p_addr)
.await
.unwrap();
let (mut store_reporter, store_task) = make_service(store::SERVICE_NAME, cfg.store_addr)
.await
.unwrap();
let client = Client::new(&cfg).await.unwrap();

// test `check`
// expect the names to be the hard-coded display names
let mut expect = StatusTable::new(
StatusRow::new(gateway_name, 1, ServiceStatus::Serving),
StatusRow::new(p2p_name, 1, ServiceStatus::Serving),
Expand All @@ -165,18 +173,14 @@ mod tests {
stream.next().await.unwrap();
got = stream.next().await.unwrap();

// use display names that are currently hard-wired into `Client`
expect.gateway.name = "gateway";
expect.p2p.name = "p2p";
expect.store.name = "store";
assert_eq!(expect, got);

// update gateway
expect
.update(StatusRow::new(gateway_name, 1, ServiceStatus::Unknown))
.unwrap();
gateway_reporter
.set_service_status(gateway_name, ServingStatus::Unknown)
.set_service_status(gateway::SERVICE_NAME, ServingStatus::Unknown)
.await;
let got = stream.next().await.unwrap();
assert_eq!(expect, got);
Expand All @@ -186,7 +190,7 @@ mod tests {
.update(StatusRow::new(p2p_name, 1, ServiceStatus::NotServing))
.unwrap();
p2p_reporter
.set_service_status(p2p_name, ServingStatus::NotServing)
.set_service_status(network::SERVICE_NAME, ServingStatus::NotServing)
.await;
let got = stream.next().await.unwrap();
assert_eq!(expect, got);
Expand All @@ -196,7 +200,7 @@ mod tests {
.update(StatusRow::new(store_name, 1, ServiceStatus::Unknown))
.unwrap();
store_reporter
.set_service_status(store_name, ServingStatus::Unknown)
.set_service_status(store::SERVICE_NAME, ServingStatus::Unknown)
.await;
let got = stream.next().await.unwrap();
assert_eq!(expect, got);
Expand Down
9 changes: 6 additions & 3 deletions iroh-rpc-client/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use crate::status::{self, StatusRow};

// name that the health service registers the gateway client as
// this is derived from the protobuf definition of a `GatewayServer`
pub(crate) const NAME: &str = "gateway.Gateway";
pub(crate) const SERVICE_NAME: &str = "gateway.Gateway";

// the display name that we expect to see in the StatusTable
pub(crate) const NAME: &str = "gateway";

#[derive(Debug, Clone)]
pub struct GatewayClient {
Expand All @@ -31,11 +34,11 @@ impl GatewayClient {

#[tracing::instrument(skip(self))]
pub async fn check(&self) -> StatusRow {
status::check(self.health.clone(), NAME).await
status::check(self.health.clone(), SERVICE_NAME, NAME).await
}

#[tracing::instrument(skip(self))]
pub async fn watch(&self) -> impl Stream<Item = StatusRow> {
status::watch(self.health.clone(), NAME).await
status::watch(self.health.clone(), SERVICE_NAME, NAME).await
}
}
9 changes: 6 additions & 3 deletions iroh-rpc-client/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use crate::status::{self, StatusRow};

// name that the health service registers the p2p client as
// this is derived from the protobuf definition of a `P2pServer`
pub(crate) const NAME: &str = "p2p.P2p";
pub(crate) const SERVICE_NAME: &str = "p2p.P2p";

// the display name that we expect to see in the StatusTable
pub(crate) const NAME: &str = "p2p";

#[derive(Debug, Clone)]
pub struct P2pClient {
Expand Down Expand Up @@ -118,12 +121,12 @@ impl P2pClient {

#[tracing::instrument(skip(self))]
pub async fn check(&self) -> StatusRow {
status::check(self.health.clone(), NAME).await
status::check(self.health.clone(), SERVICE_NAME, NAME).await
}

#[tracing::instrument(skip(self))]
pub async fn watch(&self) -> impl Stream<Item = StatusRow> {
status::watch(self.health.clone(), NAME).await
status::watch(self.health.clone(), SERVICE_NAME, NAME).await
}
}

Expand Down
61 changes: 31 additions & 30 deletions iroh-rpc-client/src/status.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::{gateway, network, store};
use anyhow::Result;
use async_stream::stream;
use futures::Stream;
Expand All @@ -11,7 +12,11 @@ use tonic_health::proto::{
const WAIT: tokio::time::Duration = tokio::time::Duration::from_millis(1000);

#[tracing::instrument(skip(health_client))]
pub async fn check(health_client: HealthClient<Channel>, service: &'static str) -> StatusRow {
pub async fn check(
health_client: HealthClient<Channel>,
service: &'static str,
display_name: &'static str,
) -> StatusRow {
let req = iroh_metrics::req::trace_tonic_req(HealthCheckRequest {
service: service.to_string(),
});
Expand All @@ -20,13 +25,14 @@ pub async fn check(health_client: HealthClient<Channel>, service: &'static str)
Ok(res) => res.into_inner().into(),
Err(s) => ServiceStatus::Down(s),
};
StatusRow::new(service, 1, status)
StatusRow::new(display_name, 1, status)
}

#[tracing::instrument(skip(health_client))]
pub async fn watch(
health_client: HealthClient<Channel>,
service: &'static str,
display_name: &'static str,
) -> impl Stream<Item = StatusRow> {
stream! {
loop {
Expand All @@ -38,19 +44,19 @@ pub async fn watch(
// loop over the stream, breaking if we get an error or stop receiving messages
loop {
match stream.message().await {
Ok(Some(message)) => yield StatusRow::new(service, 1, message.into()),
Ok(Some(message)) => yield StatusRow::new(display_name, 1, message.into()),
Ok(None) => {
yield StatusRow::new(service, 1, ServiceStatus::Down(tonic::Status::new(tonic::Code::Unavailable, format!("No more health messages from service `{}`", service))));
yield StatusRow::new(display_name, 1, ServiceStatus::Down(tonic::Status::new(tonic::Code::Unavailable, format!("No more health messages from service `{}`", service))));
break;
}
Err(status) => {
yield StatusRow::new(service, 1, ServiceStatus::Down(status));
yield StatusRow::new(display_name, 1, ServiceStatus::Down(status));
break;
}
}
}
},
Err(status) => yield StatusRow::new(service, 1, ServiceStatus::Down(status)),
Err(status) => yield StatusRow::new(display_name, 1, ServiceStatus::Down(status)),
}
/// wait before attempting to start a watch stream again
tokio::time::sleep(WAIT).await;
Expand Down Expand Up @@ -169,34 +175,29 @@ impl StatusTable {
}
}

pub fn update(&mut self, mut s: StatusRow) -> Result<()> {
match s.name {
crate::gateway::NAME | "gateway" => {
s.name = "gateway";
self.gateway = s;
Ok(())
}
crate::network::NAME | "p2p" => {
s.name = "p2p";
self.p2p = s;
Ok(())
}
crate::store::NAME | "store" => {
s.name = "store";
self.store = s;
Ok(())
}
_ => Err(anyhow::anyhow!("unknown service {}", s.name)),
pub fn update(&mut self, s: StatusRow) -> Result<()> {
if self.gateway.name() == s.name() {
self.gateway = s;
return Ok(());
}
if self.p2p.name() == s.name() {
self.p2p = s;
return Ok(());
}
if self.store.name() == s.name() {
self.store = s;
return Ok(());
}
Err(anyhow::anyhow!("unknown service {}", s.name))
}
}

impl Default for StatusTable {
fn default() -> Self {
Self {
gateway: StatusRow::new(crate::gateway::NAME, 1, ServiceStatus::Unknown),
p2p: StatusRow::new(crate::network::NAME, 1, ServiceStatus::Unknown),
store: StatusRow::new(crate::store::NAME, 1, ServiceStatus::Unknown),
gateway: StatusRow::new(gateway::NAME, 1, ServiceStatus::Unknown),
p2p: StatusRow::new(network::NAME, 1, ServiceStatus::Unknown),
store: StatusRow::new(store::NAME, 1, ServiceStatus::Unknown),
}
}
}
Expand Down Expand Up @@ -282,9 +283,9 @@ mod tests {

#[test]
fn status_table_update() {
let mut gateway = StatusRow::new("gateway", 1, ServiceStatus::Unknown);
let mut p2p = StatusRow::new("p2p", 1, ServiceStatus::Unknown);
let mut store = StatusRow::new("store", 1, ServiceStatus::Unknown);
let mut gateway = StatusRow::new(gateway::NAME, 1, ServiceStatus::Unknown);
let mut p2p = StatusRow::new(network::NAME, 1, ServiceStatus::Unknown);
let mut store = StatusRow::new(store::NAME, 1, ServiceStatus::Unknown);
let mut got = StatusTable::new(gateway.clone(), p2p.clone(), store.clone());

store.status = ServiceStatus::Serving;
Expand Down
9 changes: 6 additions & 3 deletions iroh-rpc-client/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ pub struct StoreClient {

// name that the health service registers the store client as
// this is derived from the protobuf definition of a `StoreServer`
pub const NAME: &str = "store.Store";
pub const SERVICE_NAME: &str = "store.Store";

// the display name that we expect to see in the StatusTable
pub(crate) const NAME: &str = "store";

impl StoreClient {
pub async fn new(addr: SocketAddr) -> Result<Self> {
Expand Down Expand Up @@ -84,11 +87,11 @@ impl StoreClient {

#[tracing::instrument(skip(self))]
pub async fn check(&self) -> StatusRow {
status::check(self.health.clone(), NAME).await
status::check(self.health.clone(), SERVICE_NAME, NAME).await
}

#[tracing::instrument(skip(self))]
pub async fn watch(&self) -> impl Stream<Item = StatusRow> {
status::watch(self.health.clone(), NAME).await
status::watch(self.health.clone(), SERVICE_NAME, NAME).await
}
}