forked from istio/ztunnel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.rs
372 lines (330 loc) · 12.2 KB
/
app.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::Future;
use crate::proxyfactory::ProxyFactory;
use crate::drain;
use anyhow::Context;
use prometheus_client::registry::Registry;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use tokio::task::JoinSet;
use tracing::{warn, Instrument};
use crate::identity::SecretManager;
use crate::state::ProxyStateManager;
use crate::{admin, config, metrics, proxy, readiness, signal};
use crate::{dns, xds};
pub async fn build_with_cert(
config: Arc<config::Config>,
cert_manager: Arc<SecretManager>,
) -> anyhow::Result<Bound> {
// Start the data plane worker pool.
let data_plane_pool = new_data_plane_pool(config.num_worker_threads);
let shutdown = signal::Shutdown::new();
// Setup a drain channel. drain_tx is used to trigger a drain, which will complete
// once all drain_rx handlers are dropped.
// Any component which wants time to gracefully exit should take in a drain_rx clone,
// await drain_rx.signaled(), then cleanup.
// Note: there is still a hard timeout if the draining takes too long
let (drain_tx, drain_rx) = drain::new();
// Register readiness tasks.
let ready = readiness::Ready::new();
let state_mgr_task = ready.register_task("state manager");
let proxy_task = if config.proxy {
Some(ready.register_task("proxy"))
} else {
None
};
let dns_task = if config.dns_proxy {
Some(ready.register_task("dns proxy"))
} else {
None
};
// Create and start the readiness server.
let readiness_server = readiness::Server::new(config.clone(), drain_rx.clone(), ready.clone())
.await
.context("readiness server starts")?;
let readiness_address = readiness_server.address();
// Run the readiness server in the data plane worker pool.
data_plane_pool.send(DataPlaneTask {
block_shutdown: false,
fut: Box::pin(async move {
readiness_server.spawn();
Ok(())
}),
})?;
// Register metrics.
let mut registry = Registry::default();
let istio_registry = metrics::sub_registry(&mut registry);
let _ = metrics::meta::Metrics::new(istio_registry);
let xds_metrics = xds::Metrics::new(istio_registry);
let proxy_metrics = Arc::new(proxy::Metrics::new(istio_registry));
let dns_metrics = if config.dns_proxy {
Some(dns::Metrics::new(istio_registry))
} else {
None
};
let (xds_tx, xds_rx) = tokio::sync::watch::channel(());
// Create the manager that updates proxy state from XDS.
let state_mgr = ProxyStateManager::new(
config.clone(),
xds_metrics,
proxy_metrics.clone(),
xds_tx,
cert_manager.clone(),
)
.await?;
let mut xds_rx_for_task = xds_rx.clone();
tokio::spawn(async move {
let _ = xds_rx_for_task.changed().await;
std::mem::drop(state_mgr_task);
});
let state = state_mgr.state();
// Run the XDS state manager in the current tokio worker pool.
tokio::spawn(state_mgr.run());
// Create and start the admin server.
let mut admin_server = admin::Service::new(
config.clone(),
state.clone(),
shutdown.trigger(),
drain_rx.clone(),
cert_manager.clone(),
)
.await
.context("admin server starts")?;
let admin_address = admin_server.address();
// Optionally create the HBONE proxy.
let mut proxy_addresses = None;
let mut tcp_dns_proxy_address: Option<SocketAddr> = None;
let mut udp_dns_proxy_address: Option<SocketAddr> = None;
let proxy_gen = ProxyFactory::new(
config.clone(),
state.clone(),
cert_manager.clone(),
proxy_metrics,
dns_metrics,
drain_rx.clone(),
)
.map_err(|e| anyhow::anyhow!("failed to start proxy factory {:?}", e))?;
if config.proxy_mode == config::ProxyMode::Shared {
tracing::info!("shared proxy mode - in-pod mode enabled");
let run_future = init_inpod_proxy_mgr(
&mut registry,
&mut admin_server,
&config,
proxy_gen,
ready.clone(),
drain_rx.clone(),
)?;
let mut xds_rx_for_proxy = xds_rx.clone();
data_plane_pool.send(DataPlaneTask {
block_shutdown: true,
fut: Box::pin(async move {
let _ = xds_rx_for_proxy.changed().await;
run_future.in_current_span().await;
Ok(())
}),
})?;
} else {
tracing::info!("proxy mode enabled");
let wli = config
.proxy_workload_information
.clone()
.expect("proxy_workload_information is required for dedicated mode");
let proxies = proxy_gen.new_proxies_for_dedicated(wli).await?;
match proxies.proxy {
Some(proxy) => {
proxy_addresses = Some(proxy.addresses());
// Run the HBONE proxy in the data plane worker pool.
let mut xds_rx_for_proxy = xds_rx.clone();
data_plane_pool.send(DataPlaneTask {
block_shutdown: true,
fut: Box::pin(async move {
let _ = xds_rx_for_proxy.changed().await;
proxy.run().in_current_span().await;
Ok(())
}),
})?;
drop(proxy_task);
}
None => {
tracing::info!("no proxy created");
}
}
match proxies.dns_proxy {
Some(dns_proxy) => {
// Optional
tcp_dns_proxy_address = Some(dns_proxy.tcp_address());
udp_dns_proxy_address = Some(dns_proxy.udp_address());
// Run the DNS proxy in the data plane worker pool.
let mut xds_rx_for_dns_proxy = xds_rx.clone();
data_plane_pool.send(DataPlaneTask {
block_shutdown: true,
fut: Box::pin(async move {
let _ = xds_rx_for_dns_proxy.changed().await;
dns_proxy.run().in_current_span().await;
Ok(())
}),
})?;
drop(dns_task);
}
None => {
tracing::info!("no dns proxy created");
}
}
}
// Run the admin server in the current tokio worker pool.
admin_server.spawn();
// Create and start the metrics server.
let metrics_server = metrics::Server::new(config.clone(), drain_rx.clone(), registry)
.await
.context("stats server starts")?;
let metrics_address = metrics_server.address();
// Run the metrics sever in the current tokio worker pool.
metrics_server.spawn();
Ok(Bound {
drain_tx,
shutdown,
readiness_address,
admin_address,
metrics_address,
proxy_addresses,
tcp_dns_proxy_address,
udp_dns_proxy_address,
})
}
struct DataPlaneTask {
block_shutdown: bool,
fut: Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync + 'static>>,
}
fn new_data_plane_pool(num_worker_threads: usize) -> mpsc::Sender<DataPlaneTask> {
let (tx, rx) = mpsc::channel();
let span = tracing::span::Span::current();
thread::spawn(move || {
let _span = span.enter();
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(num_worker_threads)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("ztunnel-proxy-{id}")
})
.enable_all()
.build()
.unwrap();
runtime.block_on(
async move {
let mut join_set = JoinSet::new();
// Spawn tasks as they're received, until all tasks are spawned.
let task_iter: mpsc::Iter<DataPlaneTask> = rx.iter();
for task in task_iter {
if task.block_shutdown {
// We'll block shutdown on this task.
join_set.spawn(task.fut);
} else {
// We won't block shutdown of this task. Just spawn and forget.
tokio::spawn(task.fut);
}
}
while let Some(join_result) = join_set.join_next().await {
match join_result {
Ok(result) => {
if let Err(e) = result {
warn!("data plane task failed: {e}");
}
}
Err(e) => warn!("failed joining data plane task: {e}"),
}
}
}
.in_current_span(),
);
});
tx
}
pub async fn build(config: Arc<config::Config>) -> anyhow::Result<Bound> {
let cert_manager = if config.fake_ca {
mock_secret_manager()
} else {
Arc::new(SecretManager::new(config.clone()).await?)
};
build_with_cert(config, cert_manager).await
}
#[cfg(feature = "testing")]
fn mock_secret_manager() -> Arc<SecretManager> {
crate::identity::mock::new_secret_manager(std::time::Duration::from_secs(86400))
}
#[cfg(not(feature = "testing"))]
fn mock_secret_manager() -> Arc<SecretManager> {
unimplemented!("fake_ca requires --features testing")
}
#[cfg(not(target_os = "linux"))]
fn init_inpod_proxy_mgr(
_registry: &mut Registry,
_admin_server: &mut crate::admin::Service,
_config: &config::Config,
_proxy_gen: ProxyFactory,
_ready: readiness::Ready,
_drain_rx: drain::DrainWatcher,
) -> anyhow::Result<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>> {
anyhow::bail!("in-pod mode is not supported on non-linux platforms")
}
#[cfg(target_os = "linux")]
fn init_inpod_proxy_mgr(
registry: &mut Registry,
admin_server: &mut crate::admin::Service,
config: &config::Config,
proxy_gen: ProxyFactory,
ready: readiness::Ready,
drain_rx: drain::DrainWatcher,
) -> anyhow::Result<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>> {
let metrics = Arc::new(crate::inpod::metrics::Metrics::new(
registry.sub_registry_with_prefix("workload_manager"),
));
let proxy_mgr = crate::inpod::init_and_new(metrics, admin_server, config, proxy_gen, ready)
.map_err(|e| anyhow::anyhow!("failed to start workload proxy manager {:?}", e))?;
Ok(Box::pin(async move {
match proxy_mgr.run(drain_rx).await {
Ok(()) => (),
Err(e) => {
tracing::error!("WorkloadProxyManager run error: {:?}", e);
std::process::exit(1);
}
}
}))
}
pub struct Bound {
pub admin_address: SocketAddr,
pub metrics_address: SocketAddr,
pub readiness_address: SocketAddr,
pub proxy_addresses: Option<proxy::Addresses>,
pub tcp_dns_proxy_address: Option<SocketAddr>,
pub udp_dns_proxy_address: Option<SocketAddr>,
pub shutdown: signal::Shutdown,
drain_tx: drain::DrainTrigger,
}
impl Bound {
pub async fn wait_termination(self) -> anyhow::Result<()> {
// Wait for a signal to shutdown from explicit admin shutdown or signal
self.shutdown.wait().await;
// Start a drain; this will attempt to end all connections
// or itself be interrupted by a stronger TERM signal, whichever comes first.
self.drain_tx
.start_drain_and_wait(drain::DrainMode::Graceful)
.await;
Ok(())
}
}