Skip to content

Commit

Permalink
feat(kubernetes): Use workqueues
Browse files Browse the repository at this point in the history
This uses a workqueue as a layer of indirection between:
- Events coming in from the K8s ListWatcher
- The handlers that update ContainerStats

This is necessary for 2 reasons.

1. SharedIndexInformers do not deal well with large backlogs of events
2. ObjListWatcher.Mx is shared 😱 with the Collector.Mx. Meaning that
   we pause processing K8s events while preparing metrics... and
   similarly we pause preparing metrics when handling a K8s event.

We should revisit the design of the watcher to avoid sharing both
`ContainerStats` and `Mx` as this is not a great design.

Signed-off-by: Dave Tucker <[email protected]>
  • Loading branch information
dave-tucker committed Jul 31, 2024
1 parent 287496a commit b424607
Showing 1 changed file with 93 additions and 38 deletions.
131 changes: 93 additions & 38 deletions pkg/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"sync"
"time"

k8sv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/sustainable-computing-io/kepler/pkg/bpf"
Expand All @@ -38,6 +40,13 @@ import (
const (
informerTimeout = time.Minute
podResourceType = "pods"
// Number of retries to process an event
maxRetries = 5
// Number of workers to process events
// NOTE: Given that the ContainerStats map is protected under a shared mutex,
// the number of workers should be kept at 1. Otherwise, we might starve
// the collector.
workers = 1
)

var (
Expand All @@ -47,14 +56,17 @@ var (

type ObjListWatcher struct {
// Lock to synchronize the collector update with the watcher
// NOTE: This lock is shared with the Collector
Mx *sync.Mutex

k8sCli *kubernetes.Clientset
ResourceKind string
informer cache.SharedInformer
informer cache.SharedIndexInformer
workqueue workqueue.RateLimitingInterface
stopChannel chan struct{}
bpfSupportedMetrics bpf.SupportedMetrics

// NOTE: This map is shared with the Collector
// ContainerStats holds all container energy and resource usage metrics
ContainerStats map[string]*stats.ContainerStats
}
Expand Down Expand Up @@ -89,6 +101,7 @@ func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) *ObjListWatcher
k8sCli: newK8sClient(),
ResourceKind: podResourceType,
bpfSupportedMetrics: bpfSupportedMetrics,
workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}
if w.k8sCli == nil || !config.EnableAPIServer {
return w
Expand All @@ -102,18 +115,26 @@ func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) *ObjListWatcher
metav1.NamespaceAll,
optionsModifier,
)

w.informer = cache.NewSharedInformer(objListWatcher, &k8sv1.Pod{}, 0)
w.informer = cache.NewSharedIndexInformer(objListWatcher, &corev1.Pod{}, 0, cache.Indexers{})
w.stopChannel = make(chan struct{})
_, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
w.handleAdd(obj)
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
w.workqueue.Add(key)
}
},
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
w.handleUpdate(oldObj, newObj)
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
w.workqueue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
w.handleDeleted(obj)
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
w.workqueue.Add(key)
}
},
})
if err != nil {
Expand All @@ -123,12 +144,60 @@ func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) *ObjListWatcher
return w
}

func (w *ObjListWatcher) processNextItem() bool {
key, quit := w.workqueue.Get()
if quit {
return false
}
defer w.workqueue.Done(key)

err := w.handleEvent(key.(string))
w.handleErr(err, key)
return true
}

func (w *ObjListWatcher) handleErr(err error, key interface{}) {
// No error!
if err == nil {
w.workqueue.Forget(key)
return
}

// Retry
if w.workqueue.NumRequeues(key) < maxRetries {
klog.Errorf("Error syncing pod %v: %v", key, err)
w.workqueue.AddRateLimited(key)
return
}

// Give up
w.workqueue.Forget(key)
klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}

func (w *ObjListWatcher) handleEvent(key string) error {
obj, exists, err := w.informer.GetIndexer().GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
w.handleDeleted(obj)
} else {
w.handleAdd(obj)
}
return nil
}

func (w *ObjListWatcher) Run() {
if !IsWatcherEnabled {
klog.Infoln("k8s APIserver watcher was not enabled")
return
}
defer w.workqueue.ShutDown()

go w.informer.Run(w.stopChannel)

timeoutCh := make(chan struct{})
timeoutTimer := time.AfterFunc(informerTimeout, func() {
close(timeoutCh)
Expand All @@ -137,49 +206,35 @@ func (w *ObjListWatcher) Run() {
if !cache.WaitForCacheSync(timeoutCh, w.informer.HasSynced) {
klog.Fatalf("watcher timed out waiting for caches to sync")
}

// launch workers to handle events
for i := 0; i < workers; i++ {
go wait.Until(w.runWorker, time.Second, w.stopChannel)
}

klog.Infoln("k8s APIserver watcher was started")
}

func (w *ObjListWatcher) runWorker() {
for w.processNextItem() {
}
}

func (w *ObjListWatcher) Stop() {
klog.Infoln("k8s APIserver watcher was stopped")
close(w.stopChannel)
}

func (w *ObjListWatcher) handleUpdate(oldObj, newObj interface{}) {
switch w.ResourceKind {
case podResourceType:
oldPod, ok := oldObj.(*k8sv1.Pod)
if !ok {
klog.Infof("Could not convert obj: %v", w.ResourceKind)
return
}
newPod, ok := newObj.(*k8sv1.Pod)
if !ok {
klog.Infof("Could not convert obj: %v", w.ResourceKind)
return
}
if newPod.ResourceVersion == oldPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return
}
w.handleAdd(newObj)
default:
klog.Infof("Watcher does not support object type %s", w.ResourceKind)
return
}
}

func (w *ObjListWatcher) handleAdd(obj interface{}) {
switch w.ResourceKind {
case podResourceType:
pod, ok := obj.(*k8sv1.Pod)
pod, ok := obj.(*corev1.Pod)
if !ok {
klog.Infof("Could not convert obj: %v", w.ResourceKind)
return
}
for _, condition := range pod.Status.Conditions {
if condition.Type != k8sv1.ContainersReady {
if condition.Type != corev1.ContainersReady {
continue
}
klog.V(5).Infof("Pod %s %s is ready with %d container statuses, %d init container status, %d ephemeral statues",
Expand All @@ -197,7 +252,7 @@ func (w *ObjListWatcher) handleAdd(obj interface{}) {
}
}

func (w *ObjListWatcher) fillInfo(pod *k8sv1.Pod, containers []k8sv1.ContainerStatus) error {
func (w *ObjListWatcher) fillInfo(pod *corev1.Pod, containers []corev1.ContainerStatus) error {
var err error
var exist bool
for j := 0; j < len(containers); j++ {
Expand All @@ -221,7 +276,7 @@ func (w *ObjListWatcher) fillInfo(pod *k8sv1.Pod, containers []k8sv1.ContainerSt
func (w *ObjListWatcher) handleDeleted(obj interface{}) {
switch w.ResourceKind {
case podResourceType:
pod, ok := obj.(*k8sv1.Pod)
pod, ok := obj.(*corev1.Pod)
if !ok {
klog.Fatalf("Could not convert obj: %v", w.ResourceKind)
}
Expand All @@ -237,7 +292,7 @@ func (w *ObjListWatcher) handleDeleted(obj interface{}) {
}

// TODO: instead of delete, it might be better to mark it to delete since k8s takes time to really delete an object
func (w *ObjListWatcher) deleteInfo(containers []k8sv1.ContainerStatus) {
func (w *ObjListWatcher) deleteInfo(containers []corev1.ContainerStatus) {
for j := 0; j < len(containers); j++ {
containerID := ParseContainerIDFromPodStatus(containers[j].ContainerID)
delete(w.ContainerStats, containerID)
Expand Down

0 comments on commit b424607

Please sign in to comment.