Skip to content

Commit

Permalink
Prevent endless updating when a service is paused
Browse files Browse the repository at this point in the history
In case of a paused app, the Kubernetes backend endlessly causes the web
host meta cash to crawl web host meta information. Thus, it floods all
browser to refresh all the time which then increases the load on the
server.

This commit fixes makes the started_at value optional, so that now
Utc::now usage is necessary.
  • Loading branch information
schrieveslaach committed Jan 21, 2025
1 parent e868f0c commit 25b9777
Show file tree
Hide file tree
Showing 11 changed files with 593 additions and 404 deletions.
630 changes: 323 additions & 307 deletions api/Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,27 @@ anyhow = "1.0"
async-trait = "0.1"
async-stream = "0.3"
base64 = "0.22"
boa_engine = "0.19"
boa_engine = "0.20"
bollard = { version = "0.18", features = ["chrono"] }
bytesize = { version = "1.3", features = ["serde"] }
bytes = "1.7"
bytes = "1.9"
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["derive", "cargo", "help", "usage", "error-context"] }
dyn-clone = "1.0"
env_logger = "0.11"
evmap = "10.0"
figment = { version = "0.10", features = ["env", "toml"] }
futures = { version = "0.3", features = ["compat"] }
handlebars = "6.0"
http = "1.1"
http-api-problem = "0.58"
handlebars = "6.3"
http = "1.2"
http-api-problem = "0.60"
hyper = "1.5"
hyper-util = "0.1"
http-body-util = "0.1"
jira_query = "1.5"
jsonschema = "0.26"
k8s-openapi = { version = "0.23", default-features = false, features = ["v1_26"] }
kube = { version = "0.97", default-features = false, features = ["client", "derive", "rustls-tls", "ws"] }
jsonschema = "0.28"
k8s-openapi = { version = "0.24", default-features = false, features = ["v1_28"] }
kube = { version = "0.98", default-features = false, features = ["client", "derive", "rustls-tls", "ws"] }
lazy_static = "1.5"
log = "0.4"
multimap = "0.10"
Expand All @@ -56,16 +56,16 @@ serde_regex = "1.1"
serde_yaml = "0.9"
tar = "0.4"
thiserror = "2.0"
tokio = { version = "1.41", features = ["macros", "rt", "rt-multi-thread", "sync", "time"] }
tokio = { version = "1.43", features = ["macros", "rt", "rt-multi-thread", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["sync"] }
toml = "0.8"
url = { version = "2.4", features = ["serde"] }
uuid = { version = "1.9", features = ["serde", "v4"] }
uuid = { version = "1.12", features = ["serde", "v4"] }
yansi = "1.0"

[dev-dependencies]
assert-json-diff = "2.0"
figment = { version = "0.10", features = ["test"] }
sha2 = "0.10"
tempfile = "3.7"
tempfile = "3.15"

241 changes: 204 additions & 37 deletions api/src/apps/host_meta_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct Key {

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct Value {
timestamp: DateTime<Utc>,
last_update_timestamp: DateTime<Utc>,
web_host_meta: WebHostMeta,
}

Expand Down Expand Up @@ -170,8 +170,16 @@ impl HostMetaCrawler {
else => continue,
};

let http_forwarder = match apps.http_forwarder().await {
Ok(http_forwarder) => http_forwarder,
Err(err) => {
error!("Cannot acquire http forwarder for crawling web host meta: {err}");
continue;
}
};

if let Some(timestamp) = self
.crawl(apps.clone(), &services, timestamp_prevant_startup)
.crawl(http_forwarder, &services, timestamp_prevant_startup)
.await
{
self.update_watch_tx.send_replace(timestamp);
Expand All @@ -182,13 +190,13 @@ impl HostMetaCrawler {

async fn crawl(
&mut self,
all_apps: Arc<Apps>,
http_forwarder: Box<dyn HttpForwarder>,
apps: &HashMap<AppName, Services>,
since_timestamp: DateTime<Utc>,
) -> Option<DateTime<Utc>> {
self.clear_stale_web_host_meta(apps);

let services_without_host_meta = apps
let running_services_without_host_meta = apps
.iter()
.flat_map(|(app_name, services)| {
services
Expand All @@ -202,25 +210,26 @@ impl HostMetaCrawler {
(key, service.clone())
})
})
.filter(|(_, service)| *service.status() == ServiceStatus::Running)
.filter(|(key, _service)| !self.writer.contains_key(key))
.collect::<Vec<(Key, Service)>>();

if services_without_host_meta.is_empty() {
if running_services_without_host_meta.is_empty() {
return None;
}

debug!(
"Resolving web host meta data for {:?}.",
services_without_host_meta
running_services_without_host_meta
.iter()
.map(|(k, service)| format!("({}, {})", k.app_name, service.service_name()))
.fold(String::new(), |a, b| a + &b + ", ")
);
let now = Utc::now();
let duration_prevant_startup = Utc::now().signed_duration_since(since_timestamp);
let resolved_host_meta_infos = Self::resolve_host_meta(
all_apps,
services_without_host_meta,
http_forwarder,
running_services_without_host_meta,
duration_prevant_startup,
)
.await;
Expand All @@ -235,7 +244,7 @@ impl HostMetaCrawler {
self.writer.insert(
key,
Arc::new(Value {
timestamp: now,
last_update_timestamp: now,
web_host_meta,
}),
);
Expand Down Expand Up @@ -267,11 +276,11 @@ impl HostMetaCrawler {
};

match service {
Some(service) => {
*service.status() == ServiceStatus::Paused
|| *service.started_at() > value.timestamp
// Return true if the service has been restarted in the meantime
Some(service) if service.started_at().is_some() => {
service.started_at().unwrap() > value.last_update_timestamp
}
None => true,
_ => true,
}
})
.map(|(key, _)| key)
Expand All @@ -290,7 +299,7 @@ impl HostMetaCrawler {
}

async fn resolve_host_meta(
apps: Arc<Apps>,
http_forwarder: Box<dyn HttpForwarder>,
services_without_host_meta: Vec<(Key, Service)>,
duration_prevant_startup: chrono::Duration,
) -> Vec<(Key, Service, WebHostMeta)> {
Expand All @@ -301,20 +310,17 @@ impl HostMetaCrawler {

let mut futures = services_without_host_meta
.into_iter()
.map(|(key, service)| async {
let http_forwarder = match apps.http_forwarder().await {
Ok(portforwarder) => portforwarder,
Err(err) => {
error!(
"Cannot forward TCP connection for {}, {}: {err}",
key.app_name,
service.service_name()
);
return (key, service, WebHostMeta::empty());
}
};
Self::resolve_web_host_meta(http_forwarder, key, service, duration_prevant_startup)
.map(|(key, service)| {
let http_forwarder = dyn_clone::clone_box(&*http_forwarder);
async {
Self::resolve_web_host_meta(
http_forwarder,
key,
service,
duration_prevant_startup,
)
.await
}
})
.collect::<FuturesUnordered<_>>();

Expand Down Expand Up @@ -383,27 +389,32 @@ impl HostMetaCrawler {
err
);

let duration = Utc::now().signed_duration_since(*service.started_at());
if duration >= chrono::Duration::minutes(5)
&& duration_prevant_startup >= chrono::Duration::minutes(1)
{
info!(
"Service {} is running for {}, therefore, it will be assumed that host-meta.json is not available.",
Paint::magenta(service.service_name()), duration
);
WebHostMeta::empty()
if let Some(started_at) = service.started_at() {
let duration = Utc::now().signed_duration_since(started_at);
if duration >= chrono::Duration::minutes(5)
&& duration_prevant_startup >= chrono::Duration::minutes(1)
{
info!(
"Service {} is running for {}, therefore, it will be assumed that host-meta.json is not available.",
Paint::magenta(service.service_name()), duration
);
WebHostMeta::empty()
} else {
WebHostMeta::invalid()
}
} else {
WebHostMeta::invalid()
}
}
};
(key, service, meta)
}

#[cfg(test)]
pub fn fake_empty_host_meta_info(&mut self, app_name: AppName, service_id: String) {
let web_host_meta = WebHostMeta::empty();
let value = Arc::new(Value {
timestamp: chrono::Utc::now(),
last_update_timestamp: chrono::Utc::now(),
web_host_meta,
});

Expand All @@ -419,3 +430,159 @@ impl HostMetaCrawler {
self.writer.flush();
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::models::service::State;
use anyhow::Result;
use url::Url;

#[derive(Clone)]
struct DummyHttpForwarder {}

#[async_trait]
impl HttpForwarder for DummyHttpForwarder {
async fn request_web_host_meta(
&self,
_app_name: &AppName,
_service_name: &str,
_request: http::Request<http_body_util::Empty<bytes::Bytes>>,
) -> Result<Option<WebHostMeta>> {
Ok(Some(WebHostMeta::with_version(String::from("1.2.3"))))
}
}

#[tokio::test]
async fn crawl_host_meta() {
let base_url = Url::parse("https://example.com").unwrap();
let nginx_service = Service {
id: String::from("nginx"),
state: State {
status: ServiceStatus::Running,
started_at: Some(Utc::now()),
},
config: crate::sc!("nginx", "nginx:latest"),
};
let forwarder = Box::new(DummyHttpForwarder {});
let apps = HashMap::from([(
AppName::master(),
Services::from(vec![nginx_service.clone()]),
)]);

let (cache, mut crawler) = super::new();
crawler.crawl(forwarder, &apps, Utc::now()).await;

let apps = cache.update_meta_data(apps, &RequestInfo::new(base_url.clone()));
assert_eq!(
apps,
HashMap::from([(
AppName::master(),
ServicesWithHostMeta::from(vec![
ServiceWithHostMeta::from_service_and_web_host_meta(
nginx_service,
WebHostMeta::with_version(String::from("1.2.3")),
base_url,
&AppName::master()
)
]),
)])
)
}

#[tokio::test]
async fn crawl_no_host_meta_for_paused_service() {
let base_url = Url::parse("https://example.com").unwrap();
let nginx_service = Service {
id: String::from("nginx"),
state: State {
status: ServiceStatus::Paused,
started_at: None,
},
config: crate::sc!("nginx", "nginx:latest"),
};

let forwarder = Box::new(DummyHttpForwarder {});
let apps = HashMap::from([(
AppName::master(),
Services::from(vec![nginx_service.clone()]),
)]);

let (cache, mut crawler) = super::new();
crawler.crawl(forwarder, &apps, Utc::now()).await;

let apps = cache.update_meta_data(apps, &RequestInfo::new(base_url.clone()));
assert_eq!(
apps,
HashMap::from([(
AppName::master(),
ServicesWithHostMeta::from(vec![
ServiceWithHostMeta::from_service_and_web_host_meta(
nginx_service,
WebHostMeta::empty(),
base_url,
&AppName::master()
)
]),
)])
)
}

#[tokio::test]
async fn clear_host_meta_for_paused_service() {
let base_url = Url::parse("https://example.com").unwrap();

// populate the host meta cache first
let nginx_service = Service {
id: String::from("nginx"),
state: State {
status: ServiceStatus::Running,
started_at: Some(Utc::now()),
},
config: crate::sc!("nginx", "nginx:latest"),
};

let forwarder = Box::new(DummyHttpForwarder {});
let apps = HashMap::from([(
AppName::master(),
Services::from(vec![nginx_service]),
)]);

let (cache, mut crawler) = super::new();
crawler.crawl(forwarder, &apps, Utc::now()).await;

// recrawl data for paused nginx
let nginx_service = Service {
id: String::from("nginx"),
state: State {
status: ServiceStatus::Paused,
started_at: None,
},
config: crate::sc!("nginx", "nginx:latest"),
};

let forwarder = Box::new(DummyHttpForwarder {});
let apps = HashMap::from([(
AppName::master(),
Services::from(vec![nginx_service.clone()]),
)]);

crawler.crawl(forwarder, &apps, Utc::now()).await;

let apps = cache.update_meta_data(apps, &RequestInfo::new(base_url.clone()));
assert_eq!(
apps,
HashMap::from([(
AppName::master(),
ServicesWithHostMeta::from(vec![
ServiceWithHostMeta::from_service_and_web_host_meta(
nginx_service,
WebHostMeta::empty(),
base_url,
&AppName::master()
)
]),
)])
)
}
}
Loading

0 comments on commit 25b9777

Please sign in to comment.