From e72e60102e0ffad177c02fc9fff903ed52992325 Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Mon, 22 Mar 2021 14:05:03 -0700 Subject: [PATCH] Pull `Admin` module out of `Metrics` This pulls the `hyper` server out of the `Metrics` module and move it into its own `Admin` module that handles metrics, and in the future, the health/liveness endpoint as well. Closes #101 --- README.md | 1 + docs/admin.md | 21 ++++++ src/config/builder.rs | 4 ++ src/lib.rs | 2 +- src/proxy/admin.rs | 83 ++++++++++++++++++++++ src/proxy/builder.rs | 21 ++++-- src/proxy/metrics.rs | 150 +++++++++++++++------------------------- src/proxy/mod.rs | 2 + src/proxy/server/mod.rs | 38 +++++----- src/runner.rs | 11 +-- src/test_utils.rs | 48 +++++++------ tests/metrics.rs | 10 +-- 12 files changed, 232 insertions(+), 159 deletions(-) create mode 100644 docs/admin.md create mode 100644 src/proxy/admin.rs diff --git a/README.md b/README.md index 42dfb6e239..0cf56a7231 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ Not to be used in production systems. * See the [proxy configuration reference](./docs/proxy-configuration.md) for all the configuration options. * See the [Session documentation](./docs/session.md) for an overview of quilkin sessions and metrics. * See [Filter documentation](./docs/extensions/filters/filters.md) for a list of filters, and their configuration options. +* The [Administration interface](./docs/admin.md) provides access to health and metrics endpoints. ## Code of Conduct diff --git a/docs/admin.md b/docs/admin.md new file mode 100644 index 0000000000..4809fe68fa --- /dev/null +++ b/docs/admin.md @@ -0,0 +1,21 @@ +# Administration Interface + +Quilkin exposes an HTTP interface to query different aspects of the server. + +> It is assumed that the administration interface will only ever be able to be accessible on `localhost`. + +By default, the administration interface is bound to `[::]:9091`, but it can be configured through the +[proxy configuration file](./proxy-configuration.md), like so: + +```yaml +admin: + address: [::]:9095 +``` + +The admin interface provides the following endpoints: + +## /metrics + +Outputs [Prometheus](https://prometheus.io/) formatted metrics for this proxy. + +See the [Proxy Metrics](./proxy.md#metrics) documentation for what metrics are available. diff --git a/src/config/builder.rs b/src/config/builder.rs index dac3014b93..dc81fb660b 100644 --- a/src/config/builder.rs +++ b/src/config/builder.rs @@ -47,6 +47,10 @@ impl Builder { Builder { source, ..self } } + pub fn with_admin(self, admin: Admin) -> Self { + Self { admin, ..self } + } + pub fn build(self) -> Config { Config { version: Version::V1Alpha1, diff --git a/src/lib.rs b/src/lib.rs index aa26a8e8a0..f608e658cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,7 +22,7 @@ mod cluster; pub mod config; pub mod extensions; -pub mod metrics; +pub(crate) mod metrics; pub mod proxy; pub mod runner; pub mod test_utils; diff --git a/src/proxy/admin.rs b/src/proxy/admin.rs new file mode 100644 index 0000000000..856c97a67d --- /dev/null +++ b/src/proxy/admin.rs @@ -0,0 +1,83 @@ +/* + * Copyright 2021 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::convert::Infallible; +use std::net::SocketAddr; +use std::sync::Arc; + +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, Request, Response, Server as HyperServer, StatusCode}; +use slog::{error, info, o, Logger}; +use tokio::sync::watch; + +use crate::proxy::Metrics; + +pub struct Admin { + log: Logger, + /// The address that the Admin server starts on + addr: SocketAddr, + metrics: Arc, +} + +impl Admin { + pub fn new(base: &Logger, addr: SocketAddr, metrics: Arc) -> Self { + Admin { + log: base.new(o!("source" => "proxy::Admin")), + addr, + metrics, + } + } + + pub fn run(&self, mut shutdown_rx: watch::Receiver<()>) { + info!(self.log, "Starting admin endpoint"; "address" => self.addr.to_string()); + + let metrics = self.metrics.clone(); + let make_svc = make_service_fn(move |_conn| { + let metrics = metrics.clone(); + async move { + let metrics = metrics.clone(); + Ok::<_, Infallible>(service_fn(move |req| { + let metrics = metrics.clone(); + async move { Ok::<_, Infallible>(handle_request(req, metrics)) } + })) + } + }); + + let server = HyperServer::bind(&self.addr) + .serve(make_svc) + .with_graceful_shutdown(async move { + shutdown_rx.changed().await.ok(); + }); + + let log = self.log.clone(); + tokio::spawn(async move { + if let Err(err) = server.await { + error!(log, "Admin server exited with an error"; "error" => %err); + } + }); + } +} + +fn handle_request(request: Request, metrics: Arc) -> Response { + match (request.method(), request.uri().path()) { + (&Method::GET, "/metrics") => metrics.admin_response(), + (_, _) => { + let mut response = Response::new(Body::empty()); + *response.status_mut() = StatusCode::NOT_FOUND; + response + } + } +} diff --git a/src/proxy/builder.rs b/src/proxy/builder.rs index 65031b8457..754d4f58a3 100644 --- a/src/proxy/builder.rs +++ b/src/proxy/builder.rs @@ -21,7 +21,8 @@ use crate::config::{ }; use crate::extensions::{default_registry, CreateFilterError, FilterChain, FilterRegistry}; use crate::proxy::server::metrics::Metrics as ProxyMetrics; -use crate::proxy::{Metrics, Server}; +use crate::proxy::{Admin as ProxyAdmin, Metrics, Server}; +use prometheus::Registry; use slog::{o, Drain, Logger}; use std::collections::HashSet; use std::convert::TryInto; @@ -108,17 +109,21 @@ pub struct Builder { log: Logger, config: Arc, filter_registry: FilterRegistry, - metrics: Metrics, + admin: Option, + metrics: Arc, validation_status: V, } impl From> for Builder { fn from(config: Arc) -> Self { let log = logger(); + let metrics = Arc::new(Metrics::new(&log, Registry::default())); + let admin = ProxyAdmin::new(&log, config.admin.address, metrics.clone()); Builder { config, filter_registry: default_registry(&log), - metrics: Metrics::default(), + admin: Some(admin), + metrics, log, validation_status: PendingValidation, } @@ -249,8 +254,12 @@ impl Builder { } } - pub fn with_metrics(self, metrics: Metrics) -> Self { - Self { metrics, ..self } + /// Disable the admin interface + pub fn with_disabled_admin(self) -> Self { + Self { + admin: None, + ..self + } } // Validates the builder's config and filter configurations. @@ -261,6 +270,7 @@ impl Builder { Ok(Builder { log: self.log, config: self.config, + admin: self.admin, metrics: self.metrics, filter_registry: self.filter_registry, validation_status: Validated(validated_config), @@ -275,6 +285,7 @@ impl Builder { config: Arc::new(self.validation_status.0), proxy_metrics: ProxyMetrics::new(&self.metrics.registry.clone()) .expect("metrics should be setup properly"), + admin: self.admin, metrics: self.metrics, filter_registry: Arc::new(self.filter_registry), } diff --git a/src/proxy/metrics.rs b/src/proxy/metrics.rs index c7b9d136ea..4b780db7d1 100644 --- a/src/proxy/metrics.rs +++ b/src/proxy/metrics.rs @@ -14,111 +14,27 @@ * limitations under the License. */ -use crate::proxy::sessions::metrics::Metrics as SessionMetrics; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Method, Response, Server as HyperServer, StatusCode}; -use prometheus::{Encoder, Registry, Result as MetricsResult, TextEncoder}; -use slog::{error, info, warn, Logger}; -use std::convert::Infallible; use std::net::SocketAddr; -use tokio::sync::watch::Receiver; + +use hyper::{Body, Response, StatusCode}; +use prometheus::{Encoder, Registry, Result as MetricsResult, TextEncoder}; +use slog::{o, warn, Logger}; + +use crate::proxy::sessions::metrics::Metrics as SessionMetrics; /// Metrics contains metrics configuration for the server. #[derive(Clone)] pub struct Metrics { - /// addr is the socket address on which the server exposes metrics. - /// If none is provided the server does not expose any metrics. - pub addr: Option, - pub registry: Registry, -} - -/// start_metrics_server starts a HTTP server in the background at `addr` which -/// serves prometheus metrics from `registry`. The server is bounded by `shutdown_signal`, -pub fn start_metrics_server( - addr: SocketAddr, - registry: Registry, - mut shutdown_rx: Receiver<()>, log: Logger, -) { - info!(log, "starting metrics endpoint at {}", addr.to_string()); - - let handler_log = log.clone(); - let make_svc = make_service_fn(move |_conn| { - let registry = registry.clone(); - let handler_log = handler_log.clone(); - async move { - let registry = registry.clone(); - let handler_log = handler_log.clone(); - Ok::<_, Infallible>(service_fn(move |req| { - let registry = registry.clone(); - let handler_log = handler_log.clone(); - async move { - Ok::<_, Infallible>(handle_request( - handler_log, - req.method(), - req.uri().path(), - registry, - )) - } - })) - } - }); - - let server = HyperServer::bind(&addr) - .serve(make_svc) - .with_graceful_shutdown(async move { - shutdown_rx.changed().await.ok(); - }); - - tokio::spawn(async move { - if let Err(err) = server.await { - error!(log, "metrics server exited with an error: {}", err); - } - }); -} - -fn handle_request(log: Logger, method: &Method, path: &str, registry: Registry) -> Response { - let mut response = Response::new(Body::empty()); - - match (method, path) { - (&Method::GET, "/metrics") => { - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - let body = encoder - .encode(®istry.gather(), &mut buffer) - .map_err(|err| warn!(log, "failed to encode metrics: {:?}", err)) - .and_then(|_| { - String::from_utf8(buffer) - .map(Body::from) - .map_err(|err| warn!(log, "failed to convert metrics to utf8: {:?}", err)) - }); - - match body { - Ok(body) => { - *response.body_mut() = body; - } - Err(_) => { - *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - } - } - } - _ => { - *response.status_mut() = StatusCode::NOT_FOUND; - } - }; - - response -} - -impl Default for Metrics { - fn default() -> Self { - Metrics::new(None, Registry::default()) - } + pub(crate) registry: Registry, } impl Metrics { - pub fn new(addr: Option, registry: Registry) -> Self { - Metrics { addr, registry } + pub fn new(base: &Logger, registry: Registry) -> Self { + Metrics { + log: base.new(o!("source" => "proxy::Metrics")), + registry, + } } pub fn new_session_metrics( @@ -132,4 +48,46 @@ impl Metrics { upstream.to_string(), ) } + + pub fn admin_response(&self) -> Response { + let mut response = Response::new(Body::empty()); + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let body = encoder + .encode(&self.registry.gather(), &mut buffer) + .map_err(|err| warn!(self.log, "Failed to encode metrics"; "error" => %err)) + .and_then(|_| { + String::from_utf8(buffer).map(Body::from).map_err( + |err| warn!(self.log, "Failed to convert metrics to utf8"; "error" => %err), + ) + }); + + match body { + Ok(body) => { + *response.body_mut() = body; + } + Err(_) => { + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + } + }; + + response + } +} + +#[cfg(test)] +mod tests { + use hyper::StatusCode; + use prometheus::Registry; + + use crate::proxy::Metrics; + use crate::test_utils::logger; + + #[tokio::test] + async fn admin_response() { + let log = logger(); + let metrics = Metrics::new(&log, Registry::default()); + let response = metrics.admin_response(); + assert_eq!(response.status(), StatusCode::OK); + } } diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index 609956870e..75007e4e54 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -14,10 +14,12 @@ * limitations under the License. */ +pub use admin::Admin; pub use builder::{logger, Builder}; pub use metrics::Metrics; pub use server::Server; +mod admin; mod builder; mod metrics; mod server; diff --git a/src/proxy/server/mod.rs b/src/proxy/server/mod.rs index 88ed87e876..ca3067be7b 100644 --- a/src/proxy/server/mod.rs +++ b/src/proxy/server/mod.rs @@ -28,20 +28,21 @@ use tokio::task::JoinHandle; use tokio::time::{sleep, Duration}; use metrics::Metrics as ProxyMetrics; +use resource_manager::{DynamicResourceManagers, StaticResourceManagers}; use crate::cluster::cluster_manager::SharedClusterManager; use crate::cluster::Endpoint; +use crate::extensions::filter_manager::SharedFilterManager; use crate::extensions::{Filter, FilterRegistry, ReadContext}; +use crate::proxy::builder::{ValidatedConfig, ValidatedSource}; use crate::proxy::server::error::Error; use crate::proxy::sessions::{ Packet, Session, SESSION_EXPIRY_POLL_INTERVAL, SESSION_TIMEOUT_SECONDS, }; use crate::utils::debug; -use super::metrics::{start_metrics_server, Metrics}; -use crate::extensions::filter_manager::SharedFilterManager; -use crate::proxy::builder::{ValidatedConfig, ValidatedSource}; -use resource_manager::{DynamicResourceManagers, StaticResourceManagers}; +use super::metrics::Metrics; +use crate::proxy::Admin; pub mod error; pub(super) mod metrics; @@ -56,7 +57,9 @@ pub struct Server { // We use pub(super) to limit instantiation only to the Builder. pub(super) log: Logger, pub(super) config: Arc, - pub(super) metrics: Metrics, + // Admin may be turned off, primarily for testing. + pub(super) admin: Option, + pub(super) metrics: Arc, pub(super) proxy_metrics: ProxyMetrics, pub(super) filter_registry: Arc, } @@ -89,7 +92,7 @@ struct DownstreamReceiveWorkerConfig { /// filter chain and session pipeline. struct ProcessDownstreamReceiveConfig { log: Logger, - metrics: Metrics, + metrics: Arc, proxy_metrics: ProxyMetrics, cluster_manager: SharedClusterManager, filter_manager: SharedFilterManager, @@ -104,13 +107,8 @@ impl Server { pub async fn run(self, mut shutdown_rx: watch::Receiver<()>) -> Result<()> { self.log_config(); - if let Some(addr) = self.metrics.addr { - start_metrics_server( - addr, - self.metrics.registry.clone(), - shutdown_rx.clone(), - self.log.clone(), - ); + if let Some(admin) = &self.admin { + admin.run(shutdown_rx.clone()); } let socket = Arc::new(Server::bind(self.config.proxy.port).await?); @@ -573,6 +571,7 @@ mod tests { use std::ops::Add; use std::sync::Arc; + use prometheus::Registry; use slog::info; use tokio::sync::{mpsc, RwLock}; use tokio::time; @@ -582,6 +581,7 @@ mod tests { use crate::cluster::cluster_manager::ClusterManager; use crate::config; use crate::config::{Builder as ConfigBuilder, EndPoint, Endpoints}; + use crate::extensions::filter_manager::FilterManager; use crate::extensions::{FilterChain, FilterRegistry}; use crate::proxy::sessions::Packet; use crate::proxy::Builder; @@ -590,8 +590,6 @@ mod tests { }; use super::*; - use crate::extensions::filter_manager::FilterManager; - use prometheus::Registry; #[tokio::test] async fn run_server() { @@ -749,14 +747,16 @@ mod tests { let (packet_tx, packet_rx) = mpsc::channel(num_workers); packet_txs.push(packet_tx); + let metrics = Arc::new(Metrics::new(&t.log, Registry::default())); + let proxy_metrics = ProxyMetrics::new(&metrics.registry).unwrap(); worker_configs.push(DownstreamReceiveWorkerConfig { worker_id, packet_rx, shutdown_rx: shutdown_rx.clone(), receive_config: ProcessDownstreamReceiveConfig { log: t.log.clone(), - metrics: Metrics::default(), - proxy_metrics: ProxyMetrics::new(&Metrics::default().registry).unwrap(), + metrics, + proxy_metrics, cluster_manager: cluster_manager.clone(), filter_manager: filter_manager.clone(), sessions: sessions.clone(), @@ -913,7 +913,7 @@ mod tests { key, Session::new( &t.log, - Metrics::default() + Metrics::new(&t.log, Registry::default()) .new_session_metrics(&from, &endpoint.address) .unwrap(), FilterManager::fixed(Arc::new(FilterChain::new(vec![]))), @@ -983,7 +983,7 @@ mod tests { key, Session::new( &t.log, - Metrics::default() + Metrics::new(&t.log, Registry::default()) .new_session_metrics(&from, &endpoint.address) .unwrap(), FilterManager::fixed(Arc::new(FilterChain::new(vec![]))), diff --git a/src/runner.rs b/src/runner.rs index bc5bf997c4..810d40a32b 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -18,14 +18,13 @@ use std::fs::File; use std::sync::Arc; use clap::App; -use prometheus::Registry; use slog::{info, o, Logger}; use tokio::signal; use tokio::sync::watch; use crate::config::Config; use crate::extensions::{default_registry, FilterFactory, FilterRegistry}; -use crate::proxy::{logger, Builder, Metrics}; +use crate::proxy::{logger, Builder}; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -73,14 +72,6 @@ pub async fn run(filter_factories: Vec>) -> Result<(), Er ); let server = Builder::from(config) .with_log(base_logger) - .with_metrics(Metrics::new( - Some( - "[::]:9091" - .parse() - .map_err(|err| Error(format!("failed to create metrics address: {}", err)))?, - ), - Registry::default(), - )) .with_filter_registry(create_filter_registry(&log, filter_factories)) .validate() .map_err(|err| Error(format!("{:?}", err)))? diff --git a/src/test_utils.rs b/src/test_utils.rs index ea536ca56f..63387b2286 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -30,7 +30,7 @@ use crate::extensions::{ default_registry, CreateFilterArgs, Error, Filter, FilterFactory, FilterRegistry, ReadContext, ReadResponse, WriteContext, WriteResponse, }; -use crate::proxy::{Builder, Metrics}; +use crate::proxy::Builder; pub struct TestFilterFactory {} impl FilterFactory for TestFilterFactory { @@ -132,24 +132,6 @@ impl Default for TestHelper { } impl TestHelper { - /// Creates a [`Server`] and runs it. The server is shutdown once `self` - /// goes out of scope. - pub fn run_server(&mut self, config: Config) { - self.run_server_with_filter_registry(config, default_registry(&self.log)) - } - - pub fn run_server_with_filter_registry( - &mut self, - config: Config, - filter_registry: FilterRegistry, - ) { - self.run_server_with_arguments(config, filter_registry, Metrics::default()) - } - - pub fn run_server_with_metrics(&mut self, config: Config, metrics: Metrics) { - self.run_server_with_arguments(config, default_registry(&self.log), metrics) - } - /// Opens a new socket bound to an ephemeral port pub async fn create_socket(&self) -> Arc { let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0); @@ -242,19 +224,39 @@ impl TestHelper { addr } + /// Creates a [`Server`] and runs it. The server is shutdown once `self` + /// goes out of scope. + pub fn run_server(&mut self, config: Config) { + self.run_server_with_filter_registry(config, default_registry(&self.log)) + } + + pub fn run_server_with_filter_registry( + &mut self, + config: Config, + filter_registry: FilterRegistry, + ) { + self.run_server_with_arguments(config, filter_registry, true) + } + + pub fn run_server_with_admin(&mut self, config: Config) { + self.run_server_with_arguments(config, default_registry(&self.log), false) + } + /// Create and run a server. fn run_server_with_arguments( &mut self, config: Config, filter_registry: FilterRegistry, - metrics: Metrics, + disable_admin: bool, ) { let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); self.server_shutdown_tx.push(Some(shutdown_tx)); tokio::spawn(async move { - Builder::from(Arc::new(config)) - .with_filter_registry(filter_registry) - .with_metrics(metrics) + let mut builder = Builder::from(Arc::new(config)).with_filter_registry(filter_registry); + if disable_admin { + builder = builder.with_disabled_admin(); + } + builder .validate() .unwrap() .build() diff --git a/tests/metrics.rs b/tests/metrics.rs index c2f84e374d..0228baba5d 100644 --- a/tests/metrics.rs +++ b/tests/metrics.rs @@ -20,18 +20,15 @@ extern crate quilkin; mod tests { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use prometheus::Registry; use regex::Regex; use slog::info; - use quilkin::config::{Builder as ConfigBuilder, EndPoint}; - use quilkin::proxy::Metrics; + use quilkin::config::{Admin, Builder as ConfigBuilder, EndPoint}; use quilkin::test_utils::TestHelper; #[tokio::test] async fn metrics_server() { let mut t = TestHelper::default(); - let server_metrics = Metrics::new(Some("[::]:9092".parse().unwrap()), Registry::default()); // create an echo server as an endpoint. let echo = t.run_echo_server().await; @@ -41,8 +38,11 @@ mod tests { let server_config = ConfigBuilder::empty() .with_port(server_port) .with_static(vec![], vec![EndPoint::new(echo)]) + .with_admin(Admin { + address: "[::]:9092".parse().unwrap(), + }) .build(); - t.run_server_with_metrics(server_config, server_metrics); + t.run_server_with_admin(server_config); // create a local client let client_port = 12347;