Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use replicaset and Job MetaGen based on watchers #35483

Merged
merged 17 commits into from
May 30, 2023
Merged
15 changes: 14 additions & 1 deletion libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,20 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu
if err != nil {
logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, metaConf)
replicaSetWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
jobWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}

metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)

p := &pod{
config: config,
Expand Down
15 changes: 14 additions & 1 deletion libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,21 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
if err != nil {
k.log.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}
replicaSetWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
}
jobWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
}

// TODO: refactor the above section to a common function to be used by NeWPodEventer too
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, metaConf)
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf)

k.indexers = NewIndexers(config.Indexers, metaGen)
k.watcher = watcher
Expand Down
46 changes: 44 additions & 2 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,28 @@ func NewResourceMetadataEnricher(
}
cfg, _ := conf.NewConfigFrom(&commonMetaConfig)

podMetaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, config.AddResourceMetadata)
client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
logp.Err("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}
replicaSetWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
return &nilEnricher{}
}

jobWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
return &nilEnricher{}
}

podMetaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, config.AddResourceMetadata)

namespaceMeta := metadata.NewNamespaceMetadataGenerator(config.AddResourceMetadata.Namespace, namespaceWatcher.Store(), watcher.Client())
serviceMetaGen := metadata.NewServiceMetadataGenerator(cfg, watcher.Store(), namespaceMeta, watcher.Client())
Expand Down Expand Up @@ -261,14 +282,35 @@ func NewContainerMetadataEnricher(
return &nilEnricher{}
}

client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
logp.Err("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}
replicaSetWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
return &nilEnricher{}
}

jobWatcher, err := kubernetes.NewNamedWatcher("resource_metadata_enricher_job", client, &kubernetes.Job{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
logp.Err("Error creating watcher for %T due to error %+v", &kubernetes.Job{}, err)
return &nilEnricher{}
}

commonMetaConfig := metadata.Config{}
if err := base.Module().UnpackConfig(&commonMetaConfig); err != nil {
logp.Err("Error initializing Kubernetes metadata enricher: %s", err)
return &nilEnricher{}
}
cfg, _ := conf.NewConfigFrom(&commonMetaConfig)

metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, config.AddResourceMetadata)
metaGen := metadata.GetPodMetaGen(cfg, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, config.AddResourceMetadata)

enricher := buildMetadataEnricher(watcher, nodeWatcher, namespaceWatcher,
// update
Expand Down