Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webhook: cache miss fallback to direct client for ScaledObject #6186

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: Cache miss fallback in validating webhook for ScaledObjects with direct kubernetes client ([#5973](https://github.com/kedacore/keda/issues/5973))
- **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533))
- **CloudEventSource**: Provide ClusterCloudEventSource around the management of ScaledJobs resources ([#3523](https://github.com/kedacore/keda/issues/3523))
- **CloudEventSource**: Provide ClusterCloudEventSource around the management of TriggerAuthentication/ClusterTriggerAuthentication resources ([#3524](https://github.com/kedacore/keda/issues/3524))
Expand Down
38 changes: 33 additions & 5 deletions apis/keda/v1alpha1/scaledobject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -44,14 +45,31 @@ import (
var scaledobjectlog = logf.Log.WithName("scaledobject-validation-webhook")

var kc client.Client
var cacheMissToDirectClient bool
var directClient client.Client
var restMapper meta.RESTMapper

var memoryString = "memory"
var cpuString = "cpu"

func (so *ScaledObject) SetupWebhookWithManager(mgr ctrl.Manager) error {
func (so *ScaledObject) SetupWebhookWithManager(mgr ctrl.Manager, cacheMissFallback bool) error {
kc = mgr.GetClient()
restMapper = mgr.GetRESTMapper()
cacheMissToDirectClient = cacheMissFallback
if cacheMissToDirectClient {
cfg := mgr.GetConfig()
opts := client.Options{
HTTPClient: mgr.GetHTTPClient(),
Scheme: mgr.GetScheme(),
Mapper: restMapper,
Cache: nil, // this disables the cache and explicitly uses the direct client
}
var err error
directClient, err = client.New(cfg, opts)
if err != nil {
return fmt.Errorf("failed to initialize direct client: %w", err)
}
}
return ctrl.NewWebhookManagedBy(mgr).
WithValidator(&ScaledObjectCustomValidator{}).
For(so).
Expand Down Expand Up @@ -312,6 +330,18 @@ func verifyScaledObjects(incomingSo *ScaledObject, action string, _ bool) error
return nil
}

// getFromCacheOrDirect is a helper function that tries to get an object from the cache
// if it fails, it tries to get it from the direct client
func getFromCacheOrDirect(ctx context.Context, key client.ObjectKey, obj client.Object) error {
err := kc.Get(ctx, key, obj, &client.GetOptions{})
if cacheMissToDirectClient {
if kerrors.IsNotFound(err) {
return directClient.Get(ctx, key, obj, &client.GetOptions{})
}
}
return err
}

func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string, dryRun bool) error {
if dryRun {
return nil
Expand All @@ -334,15 +364,13 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string, dryRun bool
switch incomingSoGckr.GVKString() {
case "apps/v1.Deployment":
deployment := &appsv1.Deployment{}
err := kc.Get(context.Background(), key, deployment, &client.GetOptions{})
if err != nil {
if err := getFromCacheOrDirect(context.Background(), key, deployment); err != nil {
return err
}
podSpec = &deployment.Spec.Template.Spec
case "apps/v1.StatefulSet":
statefulset := &appsv1.StatefulSet{}
err := kc.Get(context.Background(), key, statefulset, &client.GetOptions{})
if err != nil {
if err := getFromCacheOrDirect(context.Background(), key, statefulset); err != nil {
return err
}
podSpec = &statefulset.Spec.Template.Spec
Expand Down
2 changes: 1 addition & 1 deletion apis/keda/v1alpha1/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var _ = BeforeSuite(func() {
})
Expect(err).NotTo(HaveOccurred())

err = (&ScaledObject{}).SetupWebhookWithManager(mgr)
err = (&ScaledObject{}).SetupWebhookWithManager(mgr, false)
Expect(err).NotTo(HaveOccurred())
err = (&ScaledJob{}).SetupWebhookWithManager(mgr)
Expect(err).NotTo(HaveOccurred())
Expand Down
8 changes: 5 additions & 3 deletions cmd/webhooks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func main() {
var webhooksClientRequestBurst int
var certDir string
var webhooksPort int
var cacheMissToDirectClient bool

pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -70,6 +71,7 @@ func main() {
pflag.IntVar(&webhooksClientRequestBurst, "kube-api-burst", 30, "Set the burst for throttling requests sent to the apiserver")
pflag.StringVar(&certDir, "cert-dir", "/certs", "Webhook certificates dir to use. Defaults to /certs")
pflag.IntVar(&webhooksPort, "port", 9443, "Port number to serve webhooks. Defaults to 9443")
pflag.BoolVar(&cacheMissToDirectClient, "cache-miss-to-direct-client", false, "If true, on cache misses the webhook will call the direct client to fetch the object")

opts := zap.Options{}
opts.BindFlags(flag.CommandLine)
Expand Down Expand Up @@ -117,7 +119,7 @@ func main() {

kedautil.PrintWelcome(setupLog, kubeVersion, "admission webhooks")

setupWebhook(mgr)
setupWebhook(mgr, cacheMissToDirectClient)

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
Expand All @@ -134,9 +136,9 @@ func main() {
}
}

func setupWebhook(mgr manager.Manager) {
func setupWebhook(mgr manager.Manager, cacheMissToDirectClient bool) {
// setup webhooks
if err := (&kedav1alpha1.ScaledObject{}).SetupWebhookWithManager(mgr); err != nil {
if err := (&kedav1alpha1.ScaledObject{}).SetupWebhookWithManager(mgr, cacheMissToDirectClient); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "ScaledObject")
os.Exit(1)
}
Expand Down
Loading