Skip to content

Commit

Permalink
Prometheus now publishes connected clients
Browse files Browse the repository at this point in the history
Connected ip/ports are published in prometheus now for each
service.
  • Loading branch information
allada committed Aug 11, 2023
1 parent 63f7393 commit 29fa44a
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 16 deletions.
36 changes: 35 additions & 1 deletion Cargo.Bazel.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "461c6fc060010e68f913e54c1f63f256f3ff70f1bb37216541fd347b5f56be53",
"checksum": "dbd5cb231b829ad5fecb66047898c06e0f97a3c67302fbe78854dfbd0917ae48",
"crates": {
"addr2line 0.20.0": {
"name": "addr2line",
Expand Down Expand Up @@ -2687,6 +2687,36 @@
},
"license": "MIT OR Apache-2.0"
},
"drop_guard 0.3.0": {
"name": "drop_guard",
"version": "0.3.0",
"repository": {
"Http": {
"url": "https://crates.io/api/v1/crates/drop_guard/0.3.0/download",
"sha256": "2c4a817d8b683f6e649aed359aab0c47a875377516bb5791d0f7e46d9066d209"
}
},
"targets": [
{
"Library": {
"crate_name": "drop_guard",
"crate_root": "src/lib.rs",
"srcs": [
"**/*.rs"
]
}
}
],
"library_target_name": "drop_guard",
"common_attrs": {
"compile_data_glob": [
"**"
],
"edition": "2018",
"version": "0.3.0"
},
"license": "MIT/Apache-2.0"
},
"dtoa 1.0.9": {
"name": "dtoa",
"version": "1.0.9",
Expand Down Expand Up @@ -2767,6 +2797,10 @@
"id": "clap 4.3.15",
"target": "clap"
},
{
"id": "drop_guard 0.3.0",
"target": "drop_guard"
},
{
"id": "env_logger 0.10.0",
"target": "env_logger"
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ axum = "0.6.18"
tower = "0.4.13"
prometheus-client = "0.21.2"
blake3 = "1.4.1"
drop_guard = "0.3.0"

[dev-dependencies]
stdext = "0.3.1"
Expand Down
2 changes: 2 additions & 0 deletions cas/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ rust_binary(
"//util:error",
"//util:prometheus_utils",
"@crate_index//:clap",
"@crate_index//:drop_guard",
"@crate_index//:env_logger",
"@crate_index//:futures",
"@crate_index//:hyper",
"@crate_index//:json5",
"@crate_index//:parking_lot",
"@crate_index//:prometheus-client",
"@crate_index//:axum",
"@crate_index//:tower",
Expand Down
87 changes: 73 additions & 14 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@

use std::collections::HashMap;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;

use axum::Router;
use clap::Parser;
use futures::future::{ok, select_all, BoxFuture, OptionFuture, TryFutureExt};
use hyper::service::make_service_fn;
use hyper::{Body, Response, Server};
use drop_guard::guard;
use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt};
use hyper::server::conn::Http;
use hyper::{Body, Response};
use parking_lot::Mutex;
use runfiles::Runfiles;
use tokio::net::TcpListener;
use tokio::task::spawn_blocking;
use tonic::codec::CompressionEncoding;
use tonic::transport::Server as TonicServer;
Expand All @@ -32,13 +36,14 @@ use bytestream_server::ByteStreamServer;
use capabilities_server::CapabilitiesServer;
use cas_server::CasServer;
use common::fs::set_open_file_limit;
use config::cas_server::{CasConfig, CompressionAlgorithm, GlobalConfig, WorkerConfig};
use common::log;
use config::cas_server::{CasConfig, CompressionAlgorithm, GlobalConfig, ServerConfig, WorkerConfig};
use default_scheduler_factory::scheduler_factory;
use default_store_factory::store_factory;
use error::{make_err, Code, Error, ResultExt};
use execution_server::ExecutionServer;
use local_worker::new_local_worker;
use prometheus_utils::{set_metrics_enabled_for_this_thread, Registry};
use prometheus_utils::{set_metrics_enabled_for_this_thread, Collector, CollectorState, MetricsComponent, Registry};
use store::StoreManager;
use worker_api_server::WorkerApiServer;

Expand Down Expand Up @@ -156,9 +161,51 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
}
}

/// Simple wrapper to enable us to register the Hashmap so it can
/// report metrics about what clients are connected.
struct ConnectedClientsMetrics {
inner: Mutex<HashSet<SocketAddr>>,
}
impl MetricsComponent for ConnectedClientsMetrics {
fn gather_metrics(&self, c: &mut CollectorState) {
let connected_clients = self.inner.lock();
for client in connected_clients.iter() {
c.publish_with_labels(
"connected_clients",
&1,
"The endpoint of the connected clients",
vec![("endpoint".into(), format!("{}", client).into())],
);
}
}
}

// Registers all the ConnectedClientsMetrics to the registries
// and zips them in. It is done this way to get around the need
// for `root_metrics_registry` to become immutable in the loop.
let servers_and_clients: Vec<(ServerConfig, _)> = cfg
.servers
.into_iter()
.enumerate()
.map(|(i, server_cfg)| {
let name = if server_cfg.name.is_empty() {
format!("{}", i)
} else {
server_cfg.name.clone()
};
let connected_clients_mux = Arc::new(ConnectedClientsMetrics {
inner: Mutex::new(HashSet::new()),
});
let server_metrics = root_metrics_registry.sub_registry_with_prefix(format!("server_{}", name));
server_metrics.register_collector(Box::new(Collector::new(&connected_clients_mux)));

(server_cfg, connected_clients_mux)
})
.collect();

// Lock our registry as immutable and clonable.
let root_metrics_registry = Arc::new(root_metrics_registry);
for server_cfg in cfg.servers {
for (server_cfg, connected_clients_mux) in servers_and_clients {
let services = server_cfg.services.ok_or("'services' must be configured")?;

let tonic_services = TonicServer::builder()
Expand Down Expand Up @@ -368,14 +415,26 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
)
}

futures.push(Box::pin(
tokio::spawn(
Server::bind(&server_cfg.listen_address.parse()?)
.serve(make_service_fn(move |_conn| ok::<_, Error>(svc.clone())))
.map_err(|e| make_err!(Code::Internal, "Failed running service : {:?}", e)),
)
.map_ok_or_else(|e| Err(e.into()), |v| v),
));
let tcp_listener = TcpListener::bind(&server_cfg.listen_address.parse::<SocketAddr>()?).await?;
futures.push(Box::pin(async move {
loop {
let (tcp_stream, remote_addr) = tcp_listener.accept().await?;
connected_clients_mux.inner.lock().insert(remote_addr);
// This is the safest way to guarantee that if our future
// is ever dropped we will cleanup our data.
let drop_guard = guard(connected_clients_mux.clone(), move |connected_clients_mux| {
connected_clients_mux.inner.lock().remove(&remote_addr);
});
let fut = Http::new().serve_connection(tcp_stream, svc.clone());
tokio::spawn(async move {
// Move it into our spawn, so if our spawn dies the cleanup happens.
let _drop_guard = drop_guard;
if let Err(e) = fut.await {
log::error!("Failed running service : {:?}", e);
}
});
}
}));
}

if let Err(e) = select_all(futures).await.0 {
Expand Down
7 changes: 7 additions & 0 deletions config/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ pub struct ServicesConfig {

#[derive(Deserialize, Debug)]
pub struct ServerConfig {
/// Name of the server. This is used to help identify the service
/// for telemetry and logs.
///
/// Default: (index of server in config)
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]
pub name: String,

/// Address to listen on. Example: `127.0.0.1:8080` or `:8080` to listen
/// to all IPs.
#[serde(deserialize_with = "convert_string_with_shellexpand")]
Expand Down
2 changes: 2 additions & 0 deletions config/examples/basic_cas.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
}
}],
"servers": [{
"name": "public",
"listen_address": "0.0.0.0:50051",
"services": {
"cas": {
Expand Down Expand Up @@ -126,6 +127,7 @@
}
}
}, {
"name": "private_workers_servers",
"listen_address": "0.0.0.0:50061",
"services": {
// Note: This should be served on a different port, because it has
Expand Down
2 changes: 1 addition & 1 deletion deployment-examples/terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ variable "build_x86_instance_type" {
}

variable "terminate_ami_builder" {
description = "If we should terminate the AMI builder instances. Disabling this will development easier, but cost more money."
description = "If we should terminate the AMI builder instances. Disabling this will make development easier, but cost more money."
default = true
}

Expand Down

0 comments on commit 29fa44a

Please sign in to comment.