Skip to content

Commit

Permalink
Merge pull request karmada-io#5835 from RainbowMango/pr_adopt_typed_r…
Browse files Browse the repository at this point in the history
…ate_limiter_asyncworker

Adopt generic ratelimter queue
  • Loading branch information
karmada-bot authored Nov 21, 2024
2 parents 2c82055 + 02861c5 commit 87cfdfc
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pkg/metricsadapter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type MetricsController struct {
InformerManager genericmanager.MultiClusterInformerManager
TypedInformerManager typedmanager.MultiClusterInformerManager
MultiClusterDiscovery multiclient.MultiClusterDiscoveryInterface
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[any]
restConfig *rest.Config
}

Expand All @@ -70,7 +70,7 @@ func NewMetricsController(stopCh <-chan struct{}, restConfig *rest.Config, facto
InformerManager: genericmanager.GetInstance(),
TypedInformerManager: newInstance(stopCh),
restConfig: restConfig,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
Name: "metrics-adapter",
}),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Scheduler struct {
// ResourceBinding/ClusterResourceBinding rescheduling.
clusterReconcileWorker util.AsyncWorker
// TODO: implement a priority scheduling queue
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[any]

Algorithm core.ScheduleAlgorithm
schedulerCache schedulercache.Cache
Expand Down Expand Up @@ -239,7 +239,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
for _, opt := range opts {
opt(&options)
}
queue := workqueue.NewRateLimitingQueueWithConfig(ratelimiterflag.LegacyControllerRateLimiter(options.RateLimiterOptions), workqueue.RateLimitingQueueConfig{Name: "scheduler-queue"})
queue := workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](options.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{Name: "scheduler-queue"})
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.outOfTreeRegistry); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ func TestWorkerAndScheduleNext(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
queue := workqueue.NewTypedRateLimitingQueue[any](workqueue.DefaultTypedControllerRateLimiter[any]())
bindingLister := &fakeBindingLister{binding: resourceBinding}
clusterBindingLister := &fakeClusterBindingLister{binding: clusterResourceBinding}

Expand Down
4 changes: 2 additions & 2 deletions pkg/search/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Controller struct {
restMapper meta.RESTMapper
informerFactory informerfactory.SharedInformerFactory
clusterLister clusterlister.ClusterLister
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[any]

clusterRegistry sync.Map

Expand All @@ -78,7 +78,7 @@ type Controller struct {
// NewController returns a new ResourceRegistry controller
func NewController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, restMapper meta.RESTMapper) (*Controller, error) {
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
queue := workqueue.NewTypedRateLimitingQueue[any](workqueue.DefaultTypedControllerRateLimiter[any]())

c := &Controller{
restConfig: restConfig,
Expand Down
4 changes: 2 additions & 2 deletions pkg/servicenameresolutiondetector/coredns/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Detector struct {
nodeName string
clusterName string

queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[any]
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func NewCorednsDetector(memberClusterClient kubernetes.Interface, karmadaClient
cacheSynced: []cache.InformerSynced{nodeInformer.Informer().HasSynced},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: name}),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: name}),
lec: leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: baselec.LeaseDuration.Duration,
Expand Down
22 changes: 0 additions & 22 deletions pkg/sharedcli/ratelimiterflag/ratelimiterflag.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,3 @@ func DefaultControllerRateLimiter[T comparable](opts Options) workqueue.TypedRat
&workqueue.TypedBucketRateLimiter[T]{Limiter: rate.NewLimiter(rate.Limit(opts.RateLimiterQPS), opts.RateLimiterBucketSize)},
)
}

// LegacyControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags.
// TODO(@RainbowMango): This function will only used by asyncWorker and will be removed after bump Kubernetes dependency to v1.31.
func LegacyControllerRateLimiter(opts Options) workqueue.RateLimiter {
// set defaults
if opts.RateLimiterBaseDelay <= 0 {
opts.RateLimiterBaseDelay = 5 * time.Millisecond
}
if opts.RateLimiterMaxDelay <= 0 {
opts.RateLimiterMaxDelay = 1000 * time.Second
}
if opts.RateLimiterQPS <= 0 {
opts.RateLimiterQPS = 10
}
if opts.RateLimiterBucketSize <= 0 {
opts.RateLimiterBucketSize = 100
}
return workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(opts.RateLimiterBaseDelay, opts.RateLimiterMaxDelay),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(opts.RateLimiterQPS), opts.RateLimiterBucketSize)},
)
}
4 changes: 2 additions & 2 deletions pkg/util/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type asyncWorker struct {
// reconcileFunc is the function that process keys from the queue.
reconcileFunc ReconcileFunc
// queue allowing parallel processing of resources.
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[any]
}

// Options are the arguments for creating a new AsyncWorker.
Expand All @@ -83,7 +83,7 @@ func NewAsyncWorker(opt Options) AsyncWorker {
return &asyncWorker{
keyFunc: opt.KeyFunc,
reconcileFunc: opt.ReconcileFunc,
queue: workqueue.NewRateLimitingQueueWithConfig(ratelimiterflag.LegacyControllerRateLimiter(opt.RateLimiterOptions), workqueue.RateLimitingQueueConfig{
queue: workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](opt.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{
Name: opt.Name,
}),
}
Expand Down

0 comments on commit 87cfdfc

Please sign in to comment.