Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scrape GuestOS metrics-proxy and clean up issues in multiservice-discovery. #91 #93

Merged
merged 13 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeSet;
use std::fmt::Debug;

use service_discovery::{jobs::Job, TargetGroup};
use service_discovery::{job_types::JobType, TargetGroup};

pub trait Config: erased_serde::Serialize + Debug {
fn updated(&self) -> bool;
Expand All @@ -10,5 +10,5 @@ pub trait Config: erased_serde::Serialize + Debug {
erased_serde::serialize_trait_object!(Config);

pub trait ConfigBuilder {
fn build(&mut self, target_groups: BTreeSet<TargetGroup>, job: Job) -> Box<dyn Config>;
fn build(&mut self, target_groups: BTreeSet<TargetGroup>, job_type: JobType) -> Box<dyn Config>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,28 @@ use std::{collections::BTreeSet, sync::Arc};
use crossbeam::select;
use crossbeam_channel::Receiver;
use service_discovery::metrics::Metrics;
use service_discovery::{jobs::Job, IcServiceDiscovery, TargetGroup};
use service_discovery::{job_types::JobType, IcServiceDiscovery, TargetGroup};
use slog::{info, warn};

use crate::{
config_builder::ConfigBuilder, config_updater::ConfigUpdater, filters::TargetGroupFilter,
};
use crate::{config_builder::ConfigBuilder, config_updater::ConfigUpdater, filters::TargetGroupFilter};

pub fn config_updater_loop(
log: slog::Logger,
discovery: Arc<dyn IcServiceDiscovery>,
filters: Arc<dyn TargetGroupFilter>,
shutdown_signal: Receiver<()>,
jobs: Vec<Job>,
jobs: Vec<JobType>,
update_signal_recv: Receiver<()>,
mut config_builder: impl ConfigBuilder,
config_updater: impl ConfigUpdater,
metrics: Metrics,
) -> impl FnMut() {
move || loop {
for job in &jobs {
let target_groups = match discovery.get_target_groups(job._type, log.clone()) {
let target_groups = match discovery.get_target_groups(*job, log.clone()) {
Ok(t) => t,
Err(e) => {
warn!(
log,
"Failed to retrieve targets for job {}: {:?}", job._type, e
);
warn!(log, "Failed to retrieve targets for job {}: {:?}", job, e);
continue;
}
};
Expand All @@ -41,10 +36,10 @@ pub fn config_updater_loop(

metrics
.total_targets
.with_label_values(&[job._type.to_string().as_str()])
.with_label_values(&[job.to_string().as_str()])
.set(target_groups.len().try_into().unwrap());

let config = config_builder.build(filtered_target_groups, job.clone());
let config = config_builder.build(filtered_target_groups, *job);
let config_binding = config.as_ref();
if let Err(e) = config_updater.update(config_binding) {
warn!(log, "Failed to write config {}: {:?}", &config.name(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use multiservice_discovery_shared::filters::node_regex_id_filter::NodeIDRegexFil
use multiservice_discovery_shared::filters::{TargetGroupFilter, TargetGroupFilterList};
use multiservice_discovery_shared::{
builders::{
log_vector_config_structure::VectorConfigBuilderImpl,
prometheus_config_structure::PrometheusConfigBuilder, ConfigBuilder,
log_vector_config_structure::VectorConfigBuilderImpl, prometheus_config_structure::PrometheusConfigBuilder,
ConfigBuilder,
},
contracts::target::TargetDto,
};
use service_discovery::job_types::{JobType, NodeOS};
use service_discovery::job_types::JobType;
use slog::{debug, info, warn, Logger};
use std::{
collections::hash_map::DefaultHasher,
Expand Down Expand Up @@ -49,10 +49,7 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece
},
recv(interval) -> msg => msg.expect("tick failed!")
};
info!(
logger,
"Downloading from {} @ interval {:?}", cli.sd_url, tick
);
info!(logger, "Downloading from {} @ interval {:?}", cli.sd_url, tick);

let response = match client.get(cli.sd_url.clone()).send().await {
Ok(res) => res,
Expand Down Expand Up @@ -91,10 +88,7 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece

let mut hasher = DefaultHasher::new();

let targets = targets
.into_iter()
.filter(|f| filters.filter(f))
.collect::<Vec<_>>();
let targets = targets.into_iter().filter(|f| filters.filter(f)).collect::<Vec<_>>();

for target in &targets {
target.hash(&mut hasher);
Expand All @@ -103,10 +97,7 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece
let hash = hasher.finish();

if current_hash != hash {
info!(
logger,
"Received new targets from {} @ interval {:?}", cli.sd_url, tick
);
info!(logger, "Received new targets from {} @ interval {:?}", cli.sd_url, tick);
current_hash = hash;

generate_config(&cli, targets, logger.clone());
Expand All @@ -116,17 +107,8 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece

fn generate_config(cli: &CliArgs, targets: Vec<TargetDto>, logger: Logger) {
let jobs = match cli.generator {
crate::Generator::Log(_) => vec![
JobType::NodeExporter(NodeOS::Guest),
JobType::NodeExporter(NodeOS::Host),
],
crate::Generator::Metric => vec![
JobType::NodeExporter(NodeOS::Guest),
JobType::NodeExporter(NodeOS::Host),
JobType::Orchestrator,
JobType::Replica,
JobType::MetricsProxy,
],
crate::Generator::Log(_) => JobType::all_for_logs(),
crate::Generator::Metric => JobType::all_for_ic_nodes(),
};

if std::fs::metadata(&cli.output_dir).is_err() {
Expand All @@ -148,8 +130,7 @@ fn generate_config(cli: &CliArgs, targets: Vec<TargetDto>, logger: Logger) {
let config = match &cli.generator {
crate::Generator::Log(subtype) => match &subtype.subcommands {
Subtype::SystemdJournalGatewayd { batch_size } => {
VectorConfigBuilderImpl::new(*batch_size, subtype.port, subtype.bn_port)
.build(targets_with_job)
VectorConfigBuilderImpl::new(*batch_size, subtype.port, subtype.bn_port).build(targets_with_job)
}
Subtype::ExecAndJournald {
script_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::Serialize;
use crate::contracts::target::TargetDto;

use super::{
log_vector_config_structure::{handle_ip, VectorRemapTransform},
log_vector_config_structure::VectorRemapTransform,
vector_config_enriched::{VectorConfigEnriched, VectorSource, VectorTransform},
ConfigBuilder,
};
Expand All @@ -22,10 +22,7 @@ pub struct ExecLogConfigBuilderImpl {
}

impl ConfigBuilder for ExecLogConfigBuilderImpl {
fn build(
&self,
target_groups: std::collections::BTreeSet<crate::contracts::target::TargetDto>,
) -> String {
fn build(&self, target_groups: std::collections::BTreeSet<crate::contracts::target::TargetDto>) -> String {
let mut config = VectorConfigEnriched::new();
let mut edited_records: Vec<TargetDto> = vec![];

Expand Down Expand Up @@ -60,7 +57,7 @@ impl ConfigBuilder for ExecLogConfigBuilderImpl {
"--url",
format!(
"http://[{}]:{}/entries",
handle_ip(record.clone(), job, is_bn),
job.ip(*record.targets.first().unwrap(), is_bn),
match is_bn {
true => self.bn_port,
false => self.port,
Expand All @@ -82,8 +79,7 @@ impl ConfigBuilder for ExecLogConfigBuilderImpl {
include_stderr: self.include_stderr,
};

let transform =
VectorRemapTransform::from(record.clone(), *job, key.clone(), is_bn);
let transform = VectorRemapTransform::from(record.clone(), *job, key.clone(), is_bn);

let mut source_map = HashMap::new();
source_map.insert(key.clone(), Box::new(source) as Box<dyn VectorSource>);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use std::collections::{BTreeSet, HashMap};
use ic_types::PrincipalId;
use serde::Serialize;

use service_discovery::guest_to_host_address;
use service_discovery::job_types::NodeOS;
use service_discovery::{job_types::JobType, TargetGroup};

use crate::builders::vector_config_enriched::VectorSource;
Expand Down Expand Up @@ -68,7 +66,7 @@ pub(crate) fn from_targets_into_vector_config(
let key = format!("{}-{}", key, job);
let source = VectorSystemdGatewayJournaldSource {
_type: "systemd_journal_gatewayd".into(),
endpoint: handle_ip(record.clone(), job, is_bn),
endpoint: job.ip(*record.targets.first().unwrap(), is_bn).to_string(),
data_dir: "logs".to_string(),
batch_size: builder.batch_size,
port: match is_bn {
Expand All @@ -77,8 +75,7 @@ pub(crate) fn from_targets_into_vector_config(
},
};
let source_key = format!("{}-source", key);
let transform =
VectorRemapTransform::from(record.clone(), *job, source_key.clone(), is_bn);
let transform = VectorRemapTransform::from(record.clone(), *job, source_key.clone(), is_bn);

let mut sources_map = HashMap::new();
sources_map.insert(source_key, Box::new(source) as Box<dyn VectorSource>);
Expand Down Expand Up @@ -134,28 +131,29 @@ const NODE_PROVIDER_ID: &str = "node_provider_id";
impl VectorRemapTransform {
pub fn from(target: TargetDto, job: JobType, input: String, is_bn: bool) -> Self {
let target_group = Into::<TargetGroup>::into(&target);
let mut labels: HashMap<String, String> = HashMap::new();

let anonymous = PrincipalId::new_anonymous().to_string();
let mut node_id = target_group.node_id.to_string();
if node_id == anonymous {
node_id = target.clone().name
}

let endpoint = handle_ip(target.clone(), &job, is_bn);

labels.insert(IC_NAME.into(), target_group.ic_name.to_string());
labels.insert(IC_NODE.into(), node_id.clone());
labels.insert(ADDRESS.into(), endpoint);
labels.insert(
NODE_PROVIDER_ID.into(),
target_group.node_provider_id.to_string(),
);
labels.insert(DC.into(), target_group.dc_id);
labels.extend(target.custom_labels);
if let Some(subnet_id) = target_group.subnet_id {
labels.insert(IC_SUBNET.into(), subnet_id.to_string());
}
let ip = job.ip(*target.targets.first().unwrap(), is_bn).to_string();
let labels = HashMap::from([
(IC_NAME.into(), target_group.ic_name.to_string()),
(IC_NODE.into(), node_id.clone()),
(ADDRESS.into(), ip),
(NODE_PROVIDER_ID.into(), target_group.node_provider_id.to_string()),
(DC.into(), target_group.dc_id),
])
.into_iter()
.chain(target.custom_labels)
.chain(match target_group.subnet_id {
Some(subnet_id) => vec![(IC_SUBNET.into(), subnet_id.to_string())],
None => vec![],
})
.collect::<HashMap<_, _>>();

Self {
_type: "remap".into(),
inputs: vec![input],
Expand All @@ -170,32 +168,6 @@ impl VectorRemapTransform {
}
}

pub fn handle_ip(target_group: TargetDto, job_type: &JobType, is_bn: bool) -> String {
match job_type {
JobType::NodeExporter(NodeOS::Guest) => {
target_group.targets.first().unwrap().ip().to_string()
}
JobType::NodeExporter(NodeOS::Host) => match is_bn {
true => target_group.targets.first().unwrap().ip().to_string(),
false => guest_to_host_address(*target_group.targets.first().unwrap())
.unwrap()
.ip()
.to_string(),
},
JobType::MetricsProxy => match is_bn {
// It should not be possible for this to ever be true.
// There is a structural typing problem somewhere here.
true => target_group.targets.first().unwrap().ip().to_string(),
false => guest_to_host_address(*target_group.targets.first().unwrap())
.unwrap()
.ip()
.to_string(),
},
JobType::Replica => panic!("Unsupported job type for handle_ip"),
JobType::Orchestrator => panic!("Unsupported job type for handle_ip"),
}
}

#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
Expand All @@ -217,9 +189,7 @@ mod tests {
let mut parts = ipv6.split(':');

for item in &mut array {
*item = u16::from_str_radix(parts.next().unwrap(), 16)
.unwrap()
.to_be();
*item = u16::from_str_radix(parts.next().unwrap(), 16).unwrap().to_be();
}

array
Expand Down
Loading
Loading