Skip to content

Commit

Permalink
Make all components optional (istio#592)
Browse files Browse the repository at this point in the history
The DNS proxy is already optional. This PR also makes the HBONE proxy optional, which allows running ztunnel as a stand-alone DNS proxy.

Moved the DNS proxy to a top-level package.

This also cleans up the metrics/stats and readiness servers to make the code more consistent.

Fixes istio#581
  • Loading branch information
nmittler authored Jul 12, 2023
1 parent 912a060 commit 62c5ed5
Show file tree
Hide file tree
Showing 29 changed files with 625 additions and 455 deletions.
11 changes: 4 additions & 7 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,15 @@ use tokio::runtime::Runtime;
use tokio::sync::Mutex;
use tracing::info;

use ztunnel::metrics::traffic::ConnectionOpen;
use ztunnel::metrics::{IncrementRecorder, Metrics};

use ztunnel::metrics::IncrementRecorder;
use ztunnel::rbac::{Authorization, RbacMatch, StringMatch};
use ztunnel::test_helpers::app::TestApp;
use ztunnel::test_helpers::tcp::Mode;
use ztunnel::test_helpers::TEST_WORKLOAD_HBONE;
use ztunnel::test_helpers::TEST_WORKLOAD_SOURCE;
use ztunnel::test_helpers::TEST_WORKLOAD_TCP;
use ztunnel::test_helpers::{helpers, tcp};

use ztunnel::{app, identity, test_helpers};
use ztunnel::{app, identity, metrics, proxy, test_helpers};

const KB: usize = 1024;
const MB: usize = 1024 * KB;
Expand Down Expand Up @@ -354,12 +351,12 @@ pub fn rbac_connections(c: &mut Criterion) {

pub fn metrics(c: &mut Criterion) {
let mut registry = Registry::default();
let metrics = Metrics::from(&mut registry);
let metrics = proxy::Metrics::new(metrics::sub_registry(&mut registry));

let mut c = c.benchmark_group("metrics");
c.bench_function("write", |b| {
b.iter(|| {
metrics.increment(&ConnectionOpen {
metrics.increment(&proxy::ConnectionOpen {
reporter: Default::default(),
source: Some(test_helpers::test_default_workload()),
derived_source: None,
Expand Down
230 changes: 180 additions & 50 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,83 +12,196 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::identity::SecretManager;
use crate::metrics::Metrics;
use crate::state::ProxyStateManager;
use crate::{admin, config, identity, proxy, readiness, signal, stats};
use anyhow::Context;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::Duration;
use tracing::Instrument;

use anyhow::Context;
use prometheus_client::registry::Registry;
use tokio::task::JoinSet;
use tracing::{warn, Instrument};

use crate::identity::SecretManager;
use crate::state::ProxyStateManager;
use crate::{admin, config, identity, metrics, proxy, readiness, signal};
use crate::{dns, xds};

pub async fn build_with_cert(
config: config::Config,
cert_manager: Arc<SecretManager>,
) -> anyhow::Result<Bound> {
let mut registry = Registry::default();
let metrics = Arc::new(Metrics::from(&mut registry));
// Start the data plane worker pool.
let data_plane_pool = new_data_plane_pool(config.num_worker_threads);

let shutdown = signal::Shutdown::new();
// Setup a drain channel. drain_tx is used to trigger a drain, which will complete
// once all drain_rx handlers are dropped.
// Any component which wants time to gracefully exit should take in a drain_rx clone, await drain_rx.signaled(), then cleanup.
// Any component which wants time to gracefully exit should take in a drain_rx clone,
// await drain_rx.signaled(), then cleanup.
// Note: there is still a hard timeout if the draining takes too long
let (drain_tx, drain_rx) = drain::channel();

// Register readiness tasks.
let ready = readiness::Ready::new();
let proxy_task = ready.register_task("proxy listeners");
let state_mgr_task = ready.register_task("state manager");
let proxy_task = if config.proxy {
Some(ready.register_task("proxy"))
} else {
None
};
let dns_task = if config.dns_proxy {
Some(ready.register_task("dns proxy"))
} else {
None
};

// Create and start the readiness server.
let readiness_server = readiness::Server::new(config.clone(), drain_rx.clone(), ready.clone())
.await
.context("readiness server starts")?;
let readiness_address = readiness_server.address();
// Run the readiness server in the data plane worker pool.
data_plane_pool.send(DataPlaneTask {
block_shutdown: false,
fut: Box::pin(async move {
readiness_server.spawn();
Ok(())
}),
})?;

// Register metrics.
let mut registry = Registry::default();
let istio_registry = metrics::sub_registry(&mut registry);
let _ = metrics::meta::Metrics::new(istio_registry);
let xds_metrics = xds::Metrics::new(istio_registry);
let proxy_metrics = if config.proxy {
Some(proxy::Metrics::new(istio_registry))
} else {
None
};
let dns_metrics = if config.dns_proxy {
Some(dns::Metrics::new(istio_registry))
} else {
None
};

// Create and start the metrics server.
let metrics_server = metrics::Server::new(config.clone(), drain_rx.clone(), registry)
.await
.context("stats server starts")?;
let metrics_address = metrics_server.address();
// Run the metrics sever in the current tokio worker pool.
metrics_server.spawn();

// Create the manager that updates proxy state from XDS.
let state_mgr = ProxyStateManager::new(
config.clone(),
metrics.clone(),
ready.register_task("workload manager"),
xds_metrics,
state_mgr_task,
cert_manager.clone(),
)
.await?;
let state = state_mgr.state();

// Create and start the admin server.
let admin_server = admin::Service::new(
config.clone(),
state_mgr.state.clone(),
state.clone(),
shutdown.trigger(),
drain_rx.clone(),
cert_manager.clone(),
)
.await
.context("admin server starts")?;
let stats_server = stats::Service::new(config.clone(), registry, drain_rx.clone())
.await
.context("stats server starts")?;
let readiness_server = readiness::Service::new(config.clone(), ready, drain_rx.clone())
.await
.context("readiness server starts")?;
let readiness_address = readiness_server.address();
let admin_address = admin_server.address();
let stats_address = stats_server.address();

let proxy = proxy::Proxy::new(
config.clone(),
state_mgr.state.clone(),
cert_manager.clone(),
metrics.clone(),
drain_rx.clone(),
)
.await?;
drop(proxy_task);

// spawn all tasks that should run in the main thread
// Run the admin server in the current tokio worker pool.
admin_server.spawn();
stats_server.spawn();

// Run the XDS state manager in the current tokio worker pool.
tokio::spawn(state_mgr.run());

let proxy_addresses = proxy.addresses();
// Optionally create the HBONE proxy.
let proxy_addresses = if config.proxy {
let proxy = proxy::Proxy::new(
config.clone(),
state.clone(),
cert_manager.clone(),
proxy_metrics.unwrap(),
drain_rx.clone(),
)
.await?;
let addresses = proxy.addresses();

// Run the HBONE proxy in the data plane worker pool.
data_plane_pool.send(DataPlaneTask {
block_shutdown: true,
fut: Box::pin(async move {
proxy.run().in_current_span().await;
Ok(())
}),
})?;

drop(proxy_task);
Some(addresses)
} else {
None
};

// Optionally create the DNS proxy.
let dns_proxy_address = if config.dns_proxy {
let dns_proxy = dns::Server::new(
config.dns_proxy_addr,
config.network,
state.clone(),
dns::forwarder_for_mode(config.proxy_mode)?,
dns_metrics.unwrap(),
)
.await?;
let address = dns_proxy.address();

// Run the DNS proxy in the data plane worker pool.
data_plane_pool.send(DataPlaneTask {
block_shutdown: true,
fut: Box::pin(async move {
dns_proxy.run().in_current_span().await;
Ok(())
}),
})?;

drop(dns_task);
Some(address)
} else {
None
};

Ok(Bound {
drain_tx,
shutdown,
readiness_address,
admin_address,
metrics_address,
proxy_addresses,
dns_proxy_address,
})
}

struct DataPlaneTask {
block_shutdown: bool,
fut: Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync + 'static>>,
}

fn new_data_plane_pool(num_worker_threads: usize) -> mpsc::Sender<DataPlaneTask> {
let (tx, rx) = mpsc::channel();

let span = tracing::span::Span::current();
thread::spawn(move || {
let _span = span.enter();
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.num_worker_threads)
.worker_threads(num_worker_threads)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
Expand All @@ -99,21 +212,36 @@ pub async fn build_with_cert(
.unwrap();
runtime.block_on(
async move {
readiness_server.spawn();
proxy.run().in_current_span().await;
let mut join_set = JoinSet::new();

// Spawn tasks as they're received, until all tasks are spawned.
let task_iter: mpsc::Iter<DataPlaneTask> = rx.iter();
for task in task_iter {
if task.block_shutdown {
// We'll block shutdown on this task.
join_set.spawn(task.fut);
} else {
// We won't block shutdown of this task. Just spawn and forget.
tokio::spawn(task.fut);
}
}

while let Some(join_result) = join_set.join_next().await {
match join_result {
Ok(result) => {
if let Err(e) = result {
warn!("data plane task failed: {e}");
}
}
Err(e) => warn!("failed joining data plane task: {e}"),
}
}
}
.in_current_span(),
);
});

Ok(Bound {
drain_tx,
shutdown,
readiness_address,
admin_address,
stats_address,
proxy_addresses,
})
tx
}

pub async fn build(config: config::Config) -> anyhow::Result<Bound> {
Expand All @@ -127,9 +255,11 @@ pub async fn build(config: config::Config) -> anyhow::Result<Bound> {

pub struct Bound {
pub admin_address: SocketAddr,
pub proxy_addresses: proxy::Addresses,
pub metrics_address: SocketAddr,
pub readiness_address: SocketAddr,
pub stats_address: SocketAddr,

pub proxy_addresses: Option<proxy::Addresses>,
pub dns_proxy_address: Option<SocketAddr>,

pub shutdown: signal::Shutdown,
drain_tx: drain::Signal,
Expand Down
Loading

0 comments on commit 62c5ed5

Please sign in to comment.