Skip to content

Commit

Permalink
Replace informer
Browse files Browse the repository at this point in the history
Signed-off-by: Ben <[email protected]>
  • Loading branch information
slashben committed Dec 14, 2023
1 parent cc87e9d commit 975be89
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 38 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ close-shell:
cat cop_pids.txt | xargs kill -15

deploy-dev-pod:
kubectl apply -f etc/app-profile.crd.yaml
kubectl apply -f chart/kubecop/crds/app-profile.crd.yaml -f chart/kubecop/crds/runtime-rule-binding.crd.yaml
kubectl apply -f dev/devpod.yaml

build: $(BINARY_NAME)
Expand All @@ -48,4 +48,4 @@ validate-crd:

all: $(BINARY_NAME)

.PHONY: clean all install deploy-dev-pod test open-shell build validate-crd
.PHONY: clean all install deploy-dev-pod test open-shell build validate-crd
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/gammazero/workerpool v1.1.3
github.com/go-openapi/strfmt v0.21.7
github.com/inspektor-gadget/inspektor-gadget v0.23.1
github.com/kubescape/kapprofiler v0.0.35
github.com/kubescape/kapprofiler v0.0.36
github.com/prometheus/alertmanager v0.26.0
github.com/prometheus/client_golang v1.17.0
github.com/stretchr/testify v1.8.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubescape/kapprofiler v0.0.35 h1:AwMVeov7SEH96iMOd7JLSlozot1XdDFpoMp/r753rCM=
github.com/kubescape/kapprofiler v0.0.35/go.mod h1:7BibpEHD1y1CY+XLUQpjCQR3AN7sO9N0oxbaL++2s+s=
github.com/kubescape/kapprofiler v0.0.36 h1:j4WHqopGgCzDLtFPHbMrpr5ZhPFxS2kINaknaxZjaIA=
github.com/kubescape/kapprofiler v0.0.36/go.mod h1:3YmtBCW5mspDNXlH148xHgbxyHzr1+bIdYU7iLCHfl0=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
Expand Down
69 changes: 36 additions & 33 deletions pkg/approfilecache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
"strings"

"github.com/kubescape/kapprofiler/pkg/collector"
"github.com/kubescape/kapprofiler/pkg/watcher"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

type ApplicationProfileCacheEntry struct {
Expand All @@ -31,8 +30,9 @@ type ApplicationProfileK8sCache struct {
k8sConfig *rest.Config
dynamicClient *dynamic.DynamicClient

cache map[string]*ApplicationProfileCacheEntry
informerControlChannel chan struct{}
cache map[string]*ApplicationProfileCacheEntry

applicationProfileWatcher watcher.WatcherInterface

promCollector *prometheusMetric
}
Expand Down Expand Up @@ -69,20 +69,21 @@ func NewApplicationProfileK8sCache(k8sConfig *rest.Config) (*ApplicationProfileK
return nil, err
}
cache := make(map[string]*ApplicationProfileCacheEntry)
controlChannel := make(chan struct{})
newApplicationCache := ApplicationProfileK8sCache{
k8sConfig: k8sConfig,
dynamicClient: dynamicClient,
cache: cache,
informerControlChannel: controlChannel,
promCollector: createPrometheusMetric(),
k8sConfig: k8sConfig,
dynamicClient: dynamicClient,
cache: cache,
applicationProfileWatcher: watcher.NewWatcher(dynamicClient),
promCollector: createPrometheusMetric(),
}
newApplicationCache.StartController()
return &newApplicationCache, nil
}

func (cache *ApplicationProfileK8sCache) Destroy() {
close(cache.informerControlChannel)
if cache.applicationProfileWatcher != nil {
cache.applicationProfileWatcher.Stop()
}
cache.promCollector.destroy()
}

Expand Down Expand Up @@ -199,32 +200,34 @@ func (access *ApplicationProfileAccessImpl) GetDNS() (*[]collector.DnsCalls, err
}

func (c *ApplicationProfileK8sCache) StartController() {
// Initialize factory and informer
informer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(c.dynamicClient, 0, metav1.NamespaceAll, func(lo *metav1.ListOptions) {
lo.LabelSelector = "kapprofiler.kubescape.io/final=true"
}).ForResource(collector.AppProfileGvr).Informer()

// Add event handlers to informer
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { // Called when an ApplicationProfile is added
c.promCollector.reportApplicationProfileCreated()
c.handleApplicationProfile(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) { // Called when an ApplicationProfile is updated
c.promCollector.reportApplicationProfileUpdated()
c.handleApplicationProfile(newObj)

err := c.applicationProfileWatcher.Start(
watcher.WatchNotifyFunctions{
AddFunc: func(obj *unstructured.Unstructured) {
c.promCollector.createCounter.Inc()
c.handleApplicationProfile(obj)
},
UpdateFunc: func(obj *unstructured.Unstructured) {
c.promCollector.updateCounter.Inc()
c.handleApplicationProfile(obj)
},
DeleteFunc: func(obj *unstructured.Unstructured) {
c.promCollector.deleteCounter.Inc()
c.handleDeleteApplicationProfile(obj)
},
},
DeleteFunc: func(obj interface{}) { // Called when an ApplicationProfile is deleted
c.promCollector.reportApplicationProfileDeleted()
c.handleDeleteApplicationProfile(obj)
collector.AppProfileGvr,
metav1.ListOptions{
LabelSelector: "kapprofiler.kubescape.io/final=true",
},
})
)

// Run the informer
go informer.Run(c.informerControlChannel)
if err != nil {
log.Printf("Failed to start application profile watcher: %v\n", err)
}
}

func (c *ApplicationProfileK8sCache) handleApplicationProfile(obj interface{}) {
func (c *ApplicationProfileK8sCache) handleApplicationProfile(obj *unstructured.Unstructured) {
appProfile, err := getApplicationProfileFromObj(obj)
if err != nil {
log.Printf("Failed to get application profile from object: %v\n", err)
Expand Down Expand Up @@ -267,7 +270,7 @@ func (c *ApplicationProfileK8sCache) handleApplicationProfile(obj interface{}) {
}
}

func (c *ApplicationProfileK8sCache) handleDeleteApplicationProfile(obj interface{}) {
func (c *ApplicationProfileK8sCache) handleDeleteApplicationProfile(obj *unstructured.Unstructured) {
appProfile, err := getApplicationProfileFromObj(obj)
if err != nil {
log.Printf("Failed to get application profile from object: %v\n", err)
Expand Down

0 comments on commit 975be89

Please sign in to comment.