Skip to content

Commit

Permalink
feat: add activation feature for CPU/Memory scaler
Browse files Browse the repository at this point in the history
Signed-off-by: kunwooy <[email protected]>
  • Loading branch information
kunwooy committed Nov 7, 2024
1 parent 4eb7149 commit 87f5419
Show file tree
Hide file tree
Showing 42 changed files with 6,992 additions and 86 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Here is an overview of all new **experimental** features:
- **General**: Prevent multiple ScaledObjects managing one HPA ([#6130](https://github.com/kedacore/keda/issues/6130))
- **General**: Show full triggers'types and authentications'types in status ([#6187](https://github.com/kedacore/keda/issues/6187))
- **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352))
- **CPU/Memory Scaler**: Add activation feature ([#6057](https://github.com/kedacore/keda/issues/6057))
- **Elasticsearch Scaler**: Support Query at the Elasticsearch scaler ([#6216](https://github.com/kedacore/keda/issues/6216))
- **Etcd Scaler**: Add username and password support for etcd ([#6199](https://github.com/kedacore/keda/pull/6199))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
Expand Down
11 changes: 9 additions & 2 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/cache"
metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -59,7 +60,7 @@ var (

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

utilruntime.Must(metricsv1beta1.AddToScheme(scheme))
utilruntime.Must(kedav1alpha1.AddToScheme(scheme))
utilruntime.Must(eventingv1alpha1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
Expand Down Expand Up @@ -217,7 +218,13 @@ func main() {
os.Exit(1)
}

scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister())
podMetricsClient, err := k8s.InitPodMetricsClient(mgr)
if err != nil {
setupLog.Error(err, "unable to init metrics client")
os.Exit(1)
}

scaledHandler := scaling.NewScaleHandler(mgr.GetClient(), scaleClient, podMetricsClient, mgr.GetScheme(), globalHTTPTimeout, eventRecorder, secretInformer.Lister())
eventEmitter := eventemitter.NewEventEmitter(mgr.GetClient(), eventRecorder, k8sClusterName, secretInformer.Lister())

if err = (&kedacontrollers.ScaledObjectReconciler{
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func init() {

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"), r.SecretsLister)
r.scaledJobGenerations = &sync.Map{}
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
Expand Down
43 changes: 42 additions & 1 deletion controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
metrics "k8s.io/metrics/pkg/apis/metrics/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
Expand Down Expand Up @@ -750,6 +751,14 @@ var _ = Describe("ScaledObjectController", func() {
},
},
},
Status: kedav1alpha1.ScaledObjectStatus{
ScaleTargetGVKR: &kedav1alpha1.GroupVersionKindResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
Kind: "Deployment",
},
},
}
Eventually(func() error {
return k8sClient.Create(context.Background(), so)
Expand Down Expand Up @@ -790,13 +799,16 @@ var _ = Describe("ScaledObjectController", func() {
Eventually(func() error {
return k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
}).ShouldNot(HaveOccurred())

averageUtilization := int32(100)
hpa.Status.CurrentMetrics = []autoscalingv2.MetricStatus{
{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricStatus{
Name: corev1.ResourceCPU,
Current: autoscalingv2.MetricValueStatus{
Value: resource.NewQuantity(int64(100), resource.DecimalSI),
Value: resource.NewQuantity(int64(100), resource.DecimalSI),
AverageUtilization: &averageUtilization,
},
},
},
Expand All @@ -805,6 +817,30 @@ var _ = Describe("ScaledObjectController", func() {
return k8sClient.Status().Update(ctx, hpa)
}).ShouldNot(HaveOccurred())

// create pod metrics
podMetrics := &metrics.PodMetrics{
TypeMeta: metav1.TypeMeta{
Kind: "PodMetrics",
APIVersion: "metrics.k8s.io/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: deploymentName,
Namespace: "default",
Labels: map[string]string{
"app": deploymentName,
},
},
Containers: []metrics.ContainerMetrics{
{
Name: deploymentName,
Usage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
},
},
},
}
Eventually(func() error { return k8sClient.Create(ctx, podMetrics) }).ShouldNot(HaveOccurred())

// hpa metrics will only left CPU metric
Eventually(func() int {
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
Expand Down Expand Up @@ -1463,6 +1499,11 @@ func generateDeployment(name string) *appsv1.Deployment {
{
Name: name,
Image: name,
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
},
},
},
},
},
Expand Down
8 changes: 7 additions & 1 deletion controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -91,10 +92,13 @@ var _ = BeforeSuite(func() {
scaleClient, _, err := k8s.InitScaleClient(k8sManager)
Expect(err).ToNot(HaveOccurred())

metricsClient, err := k8s.InitPodMetricsClient(k8sManager)
Expect(err).ToNot(HaveOccurred())

err = (&ScaledObjectReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, metricsClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleClient: scaleClient,
EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default", nil),
}).SetupWithManager(k8sManager, controller.Options{})
Expand All @@ -107,6 +111,8 @@ var _ = BeforeSuite(func() {
}).SetupWithManager(k8sManager, controller.Options{})
Expect(err).ToNot(HaveOccurred())

Expect(metricsv1beta1.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred())

k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).ToNot(HaveOccurred())
Expect(k8sClient).ToNot(BeNil())
Expand Down
21 changes: 21 additions & 0 deletions pkg/k8s/podmetrics_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package k8s

import (
"fmt"

"k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
)

var metricsClientLog = ctrl.Log.WithName("metricsclient")

// InitPodMetricsClient initializes the client for pod metrics. It is used to fetch pod metrics from the metrics server.
func InitPodMetricsClient(mgr ctrl.Manager) (v1beta1.PodMetricsesGetter, error) {
clientset, err := v1beta1.NewForConfig(mgr.GetConfig())
if err != nil {
metricsClientLog.Error(err, "not able to create metrics client")
return nil, fmt.Errorf("failed to create metrics clientset: %w", err)
}

return clientset, nil
}
Loading

0 comments on commit 87f5419

Please sign in to comment.