Skip to content

Commit

Permalink
Allow setting MaxConcurrentReconciles for reconcilers (#2272)
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik authored Nov 23, 2021
1 parent 81e7d3b commit 9ac7a0a
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
- Add possibility to reference a GCP PubSub subscription by full link, including project ID ([#2269](https://github.com/kedacore/keda/pull/2269))
- Add `unsafeSsl` parameter in SeleniumGrid scaler ([#2157](https://github.com/kedacore/keda/pull/2157))
- Improve logs of Azure Pipelines Scaler. ([#2297](https://github.com/kedacore/keda/pull/2297))
- Allow setting `MaxConcurrentReconciles` for controllers ([#2272](https://github.com/kedacore/keda/pull/2272))

### Deprecations

Expand Down
25 changes: 13 additions & 12 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"os"
"runtime"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -50,6 +49,7 @@ import (
prommetrics "github.com/kedacore/keda/v2/pkg/metrics"
kedaprovider "github.com/kedacore/keda/v2/pkg/provider"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"github.com/kedacore/keda/v2/version"
)

Expand All @@ -70,7 +70,7 @@ var (
adapterClientRequestBurst int
)

func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration) (provider.MetricsProvider, <-chan struct{}, error) {
func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration, maxConcurrentReconciles int) (provider.MetricsProvider, <-chan struct{}, error) {
// Get a config to talk to the apiserver
cfg, err := config.GetConfig()
if cfg != nil {
Expand Down Expand Up @@ -116,14 +116,14 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
prometheusServer := &prommetrics.PrometheusMetricServer{}
go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }()
stopCh := make(chan struct{})
if err := runScaledObjectController(ctx, scheme, namespace, handler, logger, externalMetricsInfo, externalMetricsInfoLock, stopCh); err != nil {
if err := runScaledObjectController(ctx, scheme, namespace, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh); err != nil {
return nil, nil, err
}

return kedaprovider.NewProvider(ctx, logger, handler, kubeclient, namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
}

func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, stopCh chan<- struct{}) error {
func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}) error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Namespace: namespace,
Expand All @@ -137,7 +137,7 @@ func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, n
ScaleHandler: scaleHandler,
ExternalMetricsInfo: externalMetricsInfo,
ExternalMetricsInfoLock: externalMetricsInfoLock,
}).SetupWithManager(mgr, controller.Options{}); err != nil {
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
return err
}

Expand Down Expand Up @@ -200,19 +200,20 @@ func main() {

ctrl.SetLogger(logger)

globalHTTPTimeoutStr := os.Getenv("KEDA_HTTP_DEFAULT_TIMEOUT")
if globalHTTPTimeoutStr == "" {
// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutStr = "3000"
// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutMS, err := kedautil.ResolveOsEnvInt("KEDA_HTTP_DEFAULT_TIMEOUT", 3000)
if err != nil {
logger.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
return
}

globalHTTPTimeoutMS, err := strconv.Atoi(globalHTTPTimeoutStr)
controllerMaxReconciles, err := kedautil.ResolveOsEnvInt("KEDA_METRICS_CTRL_MAX_RECONCILES", 1)
if err != nil {
logger.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
logger.Error(err, "Invalid KEDA_METRICS_CTRL_MAX_RECONCILES")
return
}

kedaProvider, stopCh, err := cmd.makeProvider(ctx, time.Duration(globalHTTPTimeoutMS)*time.Millisecond)
kedaProvider, stopCh, err := cmd.makeProvider(ctx, time.Duration(globalHTTPTimeoutMS)*time.Millisecond, controllerMaxReconciles)
if err != nil {
logger.Error(err, "making provider")
return
Expand Down
1 change: 1 addition & 0 deletions controllers/keda/metrics_adapter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type MetricsScaledObjectReconciler struct {
ScaleHandler scaling.ScaleHandler
ExternalMetricsInfo *[]provider.ExternalMetricInfo
ExternalMetricsInfoLock *sync.RWMutex
MaxConcurrentReconciles int
}

var (
Expand Down
7 changes: 5 additions & 2 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand All @@ -49,14 +50,16 @@ type ScaledJobReconciler struct {
Scheme *runtime.Scheme
GlobalHTTPTimeout time.Duration
Recorder record.EventRecorder
scaleHandler scaling.ScaleHandler

scaleHandler scaling.ScaleHandler
}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"))

return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
For(&kedav1alpha1.ScaledJob{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Expand Down
17 changes: 9 additions & 8 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -75,18 +76,17 @@ type ScaledObjectReconciler struct {
}

// A cache mapping "resource.group" to true or false if we know if this resource is scalable.
var isScalableCache map[string]bool
var isScalableCache *sync.Map

func init() {
// Prefill the cache with some known values for core resources in case of future parallelism to avoid stampeding herd on startup.
isScalableCache = map[string]bool{
"deployments.apps": true,
"statefulsets.apps": true,
}
isScalableCache = &sync.Map{}
isScalableCache.Store("deployments.apps", true)
isScalableCache.Store("statefulsets.apps", true)
}

// SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
setupLog := log.Log.WithName("setup")

// create Discovery clientset
Expand Down Expand Up @@ -117,6 +117,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {

// Start controller
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
// predicate.GenerationChangedPredicate{} ignore updates to ScaledObject Status
// (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
Expand Down Expand Up @@ -284,7 +285,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
// check if we already know.
var scale *autoscalingv1.Scale
gr := gvkr.GroupResource()
isScalable := isScalableCache[gr.String()]
_, isScalable := isScalableCache.Load(gr.String())
if !isScalable || wantStatusUpdate {
// not cached, let's try to detect /scale subresource
// also rechecks when we need to update the status.
Expand All @@ -303,7 +304,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
logger.Error(errScale, "Target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, errScale
}
isScalableCache[gr.String()] = true
isScalableCache.Store(gr.String(), true)
}

// if it is not already present in ScaledObject Status:
Expand Down
3 changes: 2 additions & 1 deletion controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -85,7 +86,7 @@ var _ = BeforeSuite(func(done Done) {
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
}).SetupWithManager(k8sManager)
}).SetupWithManager(k8sManager, controller.Options{})
Expect(err).ToNot(HaveOccurred())

k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expand Down
28 changes: 18 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ import (
"fmt"
"os"
"runtime"
"strconv"
"time"

apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollers "github.com/kedacore/keda/v2/controllers/keda"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"github.com/kedacore/keda/v2/version"
//+kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -96,16 +97,23 @@ func main() {
os.Exit(1)
}

globalHTTPTimeoutStr := os.Getenv("KEDA_HTTP_DEFAULT_TIMEOUT")
if globalHTTPTimeoutStr == "" {
// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutStr = "3000"
// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutMS, err := kedautil.ResolveOsEnvInt("KEDA_HTTP_DEFAULT_TIMEOUT", 3000)
if err != nil {
setupLog.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
os.Exit(1)
}

globalHTTPTimeoutMS, err := strconv.Atoi(globalHTTPTimeoutStr)
scaledObjectMaxReconciles, err := kedautil.ResolveOsEnvInt("KEDA_SCALEDOBJECT_CTRL_MAX_RECONCILES", 5)
if err != nil {
setupLog.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
return
setupLog.Error(err, "Invalid KEDA_SCALEDOBJECT_CTRL_MAX_RECONCILES")
os.Exit(1)
}

scaledJobMaxReconciles, err := kedautil.ResolveOsEnvInt("KEDA_SCALEDJOB_CTRL_MAX_RECONCILES", 1)
if err != nil {
setupLog.Error(err, "Invalid KEDA_SCALEDJOB_CTRL_MAX_RECONCILES")
os.Exit(1)
}

globalHTTPTimeout := time.Duration(globalHTTPTimeoutMS) * time.Millisecond
Expand All @@ -116,7 +124,7 @@ func main() {
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: scaledObjectMaxReconciles}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledObject")
os.Exit(1)
}
Expand All @@ -125,7 +133,7 @@ func main() {
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: scaledJobMaxReconciles}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/env_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package util

import (
"os"
"strconv"
)

func ResolveOsEnvInt(envName string, defaultValue int) (int, error) {
valueStr, found := os.LookupEnv(envName)

if found && valueStr != "" {
return strconv.Atoi(valueStr)
}

return defaultValue, nil
}

0 comments on commit 9ac7a0a

Please sign in to comment.