-
-
Notifications
You must be signed in to change notification settings - Fork 325
/
Copy pathpod_reflector.rs
51 lines (45 loc) · 1.59 KB
/
pod_reflector.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
use std::pin::pin;
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::Api,
runtime::{predicates, reflector, watcher, WatchStreamExt},
Client, ResourceExt,
};
use tracing::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let api: Api<Pod> = Api::default_namespaced(client);
let (reader, writer) = reflector::store::<Pod>();
tokio::spawn(async move {
// Show state every 5 seconds of watching
loop {
reader.wait_until_ready().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
info!("Current pod count: {}", reader.state().len());
// full information with debug logs
for p in reader.state() {
let yaml = serde_yaml::to_string(p.as_ref()).unwrap();
debug!("Pod {}: \n{}", p.name_any(), yaml);
}
}
});
let stream = watcher(api, watcher::Config::default().any_semantic())
.default_backoff()
.modify(|pod| {
// memory optimization for our store - we don't care about managed fields/annotations/status
pod.managed_fields_mut().clear();
pod.annotations_mut().clear();
pod.status = None;
})
.reflect(writer)
.applied_objects()
.predicate_filter(predicates::resource_version);
let mut stream = pin!(stream);
while let Some(pod) = stream.try_next().await? {
info!("saw {}", pod.name_any());
}
Ok(())
}