Skip to content

Commit

Permalink
Reduce the log levl when no receiver is ready
Browse files Browse the repository at this point in the history
  • Loading branch information
Goutham Kannan committed Dec 18, 2023
1 parent 4dcc68b commit 940f56e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docbot-controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::spawn({
let cache = cache.clone();
let client = client.clone();
let pod_template_service = pod_template_service.clone();
// let pod_template_service = pod_template_service.clone();

async move {
// Watch for deployment changes
Expand Down
23 changes: 16 additions & 7 deletions docbot-crd/src/pod_template.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::future::{self, FutureExt};
use futures::future::{self};
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::PodTemplate;
use kube::{
Expand Down Expand Up @@ -48,15 +48,19 @@ impl PodTemplateService {
// Subscribe to the change, await for one, or bail out if the duration expires.
let mut receiver = self.changes_channel.subscribe();

let recv_future = Box::pin(async move {

let recv_future = tokio::spawn(async move {
while let Ok(pod_template_name_namespace_pair) = receiver.recv().await {
if pod_template_name_namespace_pair == hook_name {
return ();
return;
}
}
});

future::select(recv_future, tokio::time::sleep(timeout).boxed()).await;
let recv_task = recv_future;
let sleep_task = Box::pin(tokio::time::sleep(timeout));

future::select(recv_task, sleep_task).await;

Ok(())
}
Expand Down Expand Up @@ -134,9 +138,14 @@ impl PodTemplateService {
if let Err(err) =
changes_channel.send(format!("{}/{}", namespace.clone(), name.clone()))
{
error!("Unable to publish a change to {namespace}/{name} over internal brodcast stream with error {}", err);
} else {
info!("Published event for {}", format!("{}/{}", namespace.clone(), name.clone()))
info!("receiver count {}", changes_channel.receiver_count());
warn!("Unable to publish a change to {namespace}/{name} over internal brodcast stream with error {}",
err);
} else {
info!(
"Published event for {}",
format!("{}/{}", namespace.clone(), name.clone())
);
}
}
_ => { /* ignore */ }
Expand Down

0 comments on commit 940f56e

Please sign in to comment.