-
Notifications
You must be signed in to change notification settings - Fork 126
/
Copy pathcontroller.go
1051 lines (928 loc) · 37.4 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/cloud-provider/api"
servicehelper "k8s.io/cloud-provider/service/helpers"
"k8s.io/component-base/featuregate"
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
"k8s.io/controller-manager/pkg/features"
"k8s.io/klog/v2"
)
const (
// Interval of synchronizing service status from apiserver
serviceSyncPeriod = 30 * time.Second
// Interval of synchronizing node status from apiserver
nodeSyncPeriod = 100 * time.Second
// How long to wait before retrying the processing of a service change.
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
// should be changed appropriately.
minRetryDelay = 5 * time.Second
maxRetryDelay = 300 * time.Second
// ToBeDeletedTaint is a taint used by the CLuster Autoscaler before marking a node for deletion. Defined in
// https://github.com/kubernetes/autoscaler/blob/e80ab518340f88f364fe3ef063f8303755125971/cluster-autoscaler/utils/deletetaint/delete.go#L36
ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler"
)
type cachedService struct {
// The cached state of the service
state *v1.Service
}
type serviceCache struct {
mu sync.RWMutex // protects serviceMap
serviceMap map[string]*cachedService
}
// Controller keeps cloud provider service resources
// (like load balancers) in sync with the registry.
type Controller struct {
cloud cloudprovider.Interface
kubeClient clientset.Interface
clusterName string
balancer cloudprovider.LoadBalancer
// TODO(#85155): Stop relying on this and remove the cache completely.
cache *serviceCache
serviceLister corelisters.ServiceLister
serviceListerSynced cache.InformerSynced
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
// services and nodes that need to be synced
serviceQueue workqueue.RateLimitingInterface
nodeQueue workqueue.RateLimitingInterface
// lastSyncedNodes is used when reconciling node state and keeps track of
// the last synced set of nodes. This field is concurrently safe because the
// nodeQueue is serviced by only one go-routine, so node events are not
// processed concurrently.
lastSyncedNodes []*v1.Node
}
// New returns a new service controller to keep cloud provider service resources
// (like load balancers) in sync with the registry.
func New(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
serviceInformer coreinformers.ServiceInformer,
nodeInformer coreinformers.NodeInformer,
clusterName string,
featureGate featuregate.FeatureGate,
) (*Controller, error) {
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
registerMetrics()
s := &Controller{
cloud: cloud,
kubeClient: kubeClient,
clusterName: clusterName,
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
lastSyncedNodes: []*v1.Node{},
}
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
svc, ok := cur.(*v1.Service)
// Check cleanup here can provide a remedy when controller failed to handle
// changes before it exiting (e.g. crashing, restart, etc.).
if ok && (wantsLoadBalancer(svc) || needsCleanup(svc)) {
s.enqueueService(cur)
}
},
UpdateFunc: func(old, cur interface{}) {
oldSvc, ok1 := old.(*v1.Service)
curSvc, ok2 := cur.(*v1.Service)
if ok1 && ok2 && (s.needsUpdate(oldSvc, curSvc) || needsCleanup(curSvc)) {
s.enqueueService(cur)
}
},
// No need to handle deletion event because the deletion would be handled by
// the update path when the deletion timestamp is added.
},
serviceSyncPeriod,
)
s.serviceLister = serviceInformer.Lister()
s.serviceListerSynced = serviceInformer.Informer().HasSynced
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
s.enqueueNode(cur)
},
UpdateFunc: func(old, cur interface{}) {
oldNode, ok := old.(*v1.Node)
if !ok {
return
}
curNode, ok := cur.(*v1.Node)
if !ok {
return
}
if !shouldSyncUpdatedNode(oldNode, curNode) {
return
}
s.enqueueNode(curNode)
},
DeleteFunc: func(old interface{}) {
s.enqueueNode(old)
},
},
nodeSyncPeriod,
)
if err := s.init(); err != nil {
return nil, err
}
return s, nil
}
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
func (c *Controller) enqueueService(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
return
}
c.serviceQueue.Add(key)
}
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
func (c *Controller) enqueueNode(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
return
}
c.nodeQueue.Add(key)
}
// Run starts a background goroutine that watches for changes to services that
// have (or had) LoadBalancers=true and ensures that they have
// load balancers created and deleted appropriately.
// serviceSyncPeriod controls how often we check the cluster's services to
// ensure that the correct load balancers exist.
// nodeSyncPeriod controls how often we check the cluster's nodes to determine
// if load balancers need to be updated to point to a new set.
//
// It's an error to call Run() more than once for a given ServiceController
// object.
func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
defer runtime.HandleCrash()
defer c.serviceQueue.ShutDown()
defer c.nodeQueue.ShutDown()
// Start event processing pipeline.
c.eventBroadcaster.StartStructuredLogging(0)
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
klog.Info("Starting service controller")
defer klog.Info("Shutting down service controller")
controllerManagerMetrics.ControllerStarted("service")
defer controllerManagerMetrics.ControllerStopped("service")
if !cache.WaitForNamedCacheSync("service", ctx.Done(), c.serviceListerSynced, c.nodeListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.serviceWorker, time.Second)
}
// Initialize one go-routine servicing node events. This ensure we only
// process one node at any given moment in time
go wait.UntilWithContext(ctx, func(ctx context.Context) { c.nodeWorker(ctx, workers) }, time.Second)
<-ctx.Done()
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (c *Controller) serviceWorker(ctx context.Context) {
for c.processNextServiceItem(ctx) {
}
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (c *Controller) nodeWorker(ctx context.Context, workers int) {
for c.processNextNodeItem(ctx, workers) {
}
}
func (c *Controller) processNextNodeItem(ctx context.Context, workers int) bool {
key, quit := c.nodeQueue.Get()
if quit {
return false
}
defer c.nodeQueue.Done(key)
for serviceToRetry := range c.syncNodes(ctx, workers) {
c.serviceQueue.Add(serviceToRetry)
}
c.nodeQueue.Forget(key)
return true
}
func (c *Controller) processNextServiceItem(ctx context.Context) bool {
key, quit := c.serviceQueue.Get()
if quit {
return false
}
defer c.serviceQueue.Done(key)
err := c.syncService(ctx, key.(string))
if err == nil {
c.serviceQueue.Forget(key)
return true
}
var re *api.RetryError
if errors.As(err, &re) {
klog.Warningf("error processing service %v (retrying in %s): %v", key, re.RetryAfter(), err)
c.serviceQueue.AddAfter(key, re.RetryAfter())
} else {
runtime.HandleError(fmt.Errorf("error processing service %v (retrying with exponential backoff): %v", key, err))
c.serviceQueue.AddRateLimited(key)
}
return true
}
func (c *Controller) init() error {
if c.cloud == nil {
return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail")
}
balancer, ok := c.cloud.LoadBalancer()
if !ok {
return fmt.Errorf("the cloud provider does not support external load balancers")
}
c.balancer = balancer
return nil
}
// processServiceCreateOrUpdate operates loadbalancers for the incoming service accordingly.
// Returns an error if processing the service update failed.
func (c *Controller) processServiceCreateOrUpdate(ctx context.Context, service *v1.Service, key string) error {
// TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion
// path. Ref https://github.com/kubernetes/enhancements/issues/980.
cachedService := c.cache.getOrCreate(key)
if cachedService.state != nil && cachedService.state.UID != service.UID {
// This happens only when a service is deleted and re-created
// in a short period, which is only possible when it doesn't
// contain finalizer.
if err := c.processLoadBalancerDelete(ctx, cachedService.state, key); err != nil {
return err
}
}
// Always cache the service, we need the info for service deletion in case
// when load balancer cleanup is not handled via finalizer.
cachedService.state = service
op, err := c.syncLoadBalancerIfNeeded(ctx, service, key)
if err != nil {
c.eventRecorder.Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed", "Error syncing load balancer: %v", err)
return err
}
if op == deleteLoadBalancer {
// Only delete the cache upon successful load balancer deletion.
c.cache.delete(key)
}
return nil
}
type loadBalancerOperation int
const (
deleteLoadBalancer loadBalancerOperation = iota
ensureLoadBalancer
)
// syncLoadBalancerIfNeeded ensures that service's status is synced up with loadbalancer
// i.e. creates loadbalancer for service if requested and deletes loadbalancer if the service
// doesn't want a loadbalancer no more. Returns whatever error occurred.
func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.Service, key string) (loadBalancerOperation, error) {
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events.
// Save the state so we can avoid a write if it doesn't change
previousStatus := service.Status.LoadBalancer.DeepCopy()
var newStatus *v1.LoadBalancerStatus
var op loadBalancerOperation
var err error
if !wantsLoadBalancer(service) || needsCleanup(service) {
// Delete the load balancer if service no longer wants one, or if service needs cleanup.
op = deleteLoadBalancer
newStatus = &v1.LoadBalancerStatus{}
_, exists, err := c.balancer.GetLoadBalancer(ctx, c.clusterName, service)
if err != nil {
return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err)
}
if exists {
klog.V(2).Infof("Deleting existing load balancer for service %s", key)
c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := c.balancer.EnsureLoadBalancerDeleted(ctx, c.clusterName, service); err != nil {
if err == cloudprovider.ImplementedElsewhere {
klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error on deletion", key, c.cloud.ProviderName())
} else {
return op, fmt.Errorf("failed to delete load balancer: %v", err)
}
}
}
// Always remove finalizer when load balancer is deleted, this ensures Services
// can be deleted after all corresponding load balancer resources are deleted.
if err := c.removeFinalizer(service); err != nil {
return op, fmt.Errorf("failed to remove load balancer cleanup finalizer: %v", err)
}
c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
} else {
// Create or update the load balancer if service wants one.
op = ensureLoadBalancer
klog.V(2).Infof("Ensuring load balancer for service %s", key)
c.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer")
// Always add a finalizer prior to creating load balancers, this ensures Services
// can't be deleted until all corresponding load balancer resources are also deleted.
if err := c.addFinalizer(service); err != nil {
return op, fmt.Errorf("failed to add load balancer cleanup finalizer: %v", err)
}
newStatus, err = c.ensureLoadBalancer(ctx, service)
if err != nil {
if err == cloudprovider.ImplementedElsewhere {
// ImplementedElsewhere indicates that the ensureLoadBalancer is a nop and the
// functionality is implemented by a different controller. In this case, we
// return immediately without doing anything.
klog.V(4).Infof("LoadBalancer for service %s implemented by a different controller %s, Ignoring error", key, c.cloud.ProviderName())
return op, nil
}
// Use %w deliberately so that a returned RetryError can be handled.
return op, fmt.Errorf("failed to ensure load balancer: %w", err)
}
if newStatus == nil {
return op, fmt.Errorf("service status returned by EnsureLoadBalancer is nil")
}
c.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
}
if err := c.patchStatus(service, previousStatus, newStatus); err != nil {
// Only retry error that isn't not found:
// - Not found error mostly happens when service disappears right after
// we remove the finalizer.
// - We can't patch status on non-exist service anyway.
if !apierrors.IsNotFound(err) {
return op, fmt.Errorf("failed to update load balancer status: %v", err)
}
}
return op, nil
}
func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) {
nodes, err := listWithPredicates(c.nodeLister, getNodePredicatesForService(service)...)
if err != nil {
return nil, err
}
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(nodes) == 0 {
c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
}
// - Only one protocol supported per service
// - Not all cloud providers support all protocols and the next step is expected to return
// an error for unsupported protocols
return c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes)
}
// ListKeys implements the interface required by DeltaFIFO to list the keys we
// already know about.
func (s *serviceCache) ListKeys() []string {
s.mu.RLock()
defer s.mu.RUnlock()
keys := make([]string, 0, len(s.serviceMap))
for k := range s.serviceMap {
keys = append(keys, k)
}
return keys
}
// GetByKey returns the value stored in the serviceMap under the given key
func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if v, ok := s.serviceMap[key]; ok {
return v, true, nil
}
return nil, false, nil
}
// ListKeys implements the interface required by DeltaFIFO to list the keys we
// already know about.
func (s *serviceCache) allServices() []*v1.Service {
s.mu.RLock()
defer s.mu.RUnlock()
services := make([]*v1.Service, 0, len(s.serviceMap))
for _, v := range s.serviceMap {
services = append(services, v.state)
}
return services
}
func (s *serviceCache) get(serviceName string) (*cachedService, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
service, ok := s.serviceMap[serviceName]
return service, ok
}
func (s *serviceCache) getOrCreate(serviceName string) *cachedService {
s.mu.Lock()
defer s.mu.Unlock()
service, ok := s.serviceMap[serviceName]
if !ok {
service = &cachedService{}
s.serviceMap[serviceName] = service
}
return service
}
func (s *serviceCache) set(serviceName string, service *cachedService) {
s.mu.Lock()
defer s.mu.Unlock()
s.serviceMap[serviceName] = service
}
func (s *serviceCache) delete(serviceName string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.serviceMap, serviceName)
}
// needsCleanup checks if load balancer needs to be cleaned up as indicated by finalizer.
func needsCleanup(service *v1.Service) bool {
if !servicehelper.HasLBFinalizer(service) {
return false
}
if service.ObjectMeta.DeletionTimestamp != nil {
return true
}
// Service doesn't want loadBalancer but owns loadBalancer finalizer also need to be cleaned up.
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
return true
}
return false
}
// needsUpdate checks if load balancer needs to be updated due to change in attributes.
func (c *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
return false
}
if wantsLoadBalancer(oldService) != wantsLoadBalancer(newService) {
c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v",
oldService.Spec.Type, newService.Spec.Type)
return true
}
if wantsLoadBalancer(newService) && !reflect.DeepEqual(oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges) {
c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadBalancerSourceRanges", "%v -> %v",
oldService.Spec.LoadBalancerSourceRanges, newService.Spec.LoadBalancerSourceRanges)
return true
}
if !portsEqualForLB(oldService, newService) || oldService.Spec.SessionAffinity != newService.Spec.SessionAffinity {
return true
}
if !reflect.DeepEqual(oldService.Spec.SessionAffinityConfig, newService.Spec.SessionAffinityConfig) {
return true
}
if !loadBalancerIPsAreEqual(oldService, newService) {
c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
return true
}
if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
return true
}
for i := range oldService.Spec.ExternalIPs {
if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v",
newService.Spec.ExternalIPs[i])
return true
}
}
if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
return true
}
if oldService.UID != newService.UID {
c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v",
oldService.UID, newService.UID)
return true
}
if oldService.Spec.ExternalTrafficPolicy != newService.Spec.ExternalTrafficPolicy {
c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalTrafficPolicy", "%v -> %v",
oldService.Spec.ExternalTrafficPolicy, newService.Spec.ExternalTrafficPolicy)
return true
}
if oldService.Spec.HealthCheckNodePort != newService.Spec.HealthCheckNodePort {
c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "HealthCheckNodePort", "%v -> %v",
oldService.Spec.HealthCheckNodePort, newService.Spec.HealthCheckNodePort)
return true
}
// User can upgrade (add another clusterIP or ipFamily) or can downgrade (remove secondary clusterIP or ipFamily),
// but CAN NOT change primary/secondary clusterIP || ipFamily UNLESS they are changing from/to/ON ExternalName
// so not care about order, only need check the length.
if len(oldService.Spec.IPFamilies) != len(newService.Spec.IPFamilies) {
c.eventRecorder.Eventf(newService, v1.EventTypeNormal, "IPFamilies", "Count: %v -> %v",
len(oldService.Spec.IPFamilies), len(newService.Spec.IPFamilies))
return true
}
return false
}
func getPortsForLB(service *v1.Service) []*v1.ServicePort {
ports := []*v1.ServicePort{}
for i := range service.Spec.Ports {
sp := &service.Spec.Ports[i]
ports = append(ports, sp)
}
return ports
}
func portsEqualForLB(x, y *v1.Service) bool {
xPorts := getPortsForLB(x)
yPorts := getPortsForLB(y)
return portSlicesEqualForLB(xPorts, yPorts)
}
func portSlicesEqualForLB(x, y []*v1.ServicePort) bool {
if len(x) != len(y) {
return false
}
for i := range x {
if !portEqualForLB(x[i], y[i]) {
return false
}
}
return true
}
func portEqualForLB(x, y *v1.ServicePort) bool {
// TODO: Should we check name? (In theory, an LB could expose it)
if x.Name != y.Name {
return false
}
if x.Protocol != y.Protocol {
return false
}
if x.Port != y.Port {
return false
}
if x.NodePort != y.NodePort {
return false
}
if x.TargetPort != y.TargetPort {
return false
}
if !reflect.DeepEqual(x.AppProtocol, y.AppProtocol) {
return false
}
return true
}
func serviceKeys(services []*v1.Service) sets.String {
ret := sets.NewString()
for _, service := range services {
key, _ := cache.MetaNamespaceKeyFunc(service)
ret.Insert(key)
}
return ret
}
func nodeNames(nodes []*v1.Node) sets.String {
ret := sets.NewString()
for _, node := range nodes {
ret.Insert(node.Name)
}
return ret
}
func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool {
// Evaluate the individual node exclusion predicate before evaluating the
// compounded result of all predicates. We don't sync changes on the
// readiness condition for eTP:Local services or when
// StableLoadBalancerNodeSet is enabled, hence if a node remains NotReady
// and a user adds the exclusion label we will need to sync as to make sure
// this change is reflected correctly on ETP=local services. The sync
// function compares lastSyncedNodes with the new (existing) set of nodes
// for each service, so services which are synced with the same set of nodes
// should be skipped internally in the sync function. This is needed as to
// trigger a global sync for all services and make sure no service gets
// skipped due to a changing node predicate.
if respectsPredicates(oldNode, nodeIncludedPredicate) != respectsPredicates(newNode, nodeIncludedPredicate) {
return true
}
// For the same reason as above, also check for changes to the providerID
if respectsPredicates(oldNode, nodeHasProviderIDPredicate) != respectsPredicates(newNode, nodeHasProviderIDPredicate) {
return true
}
if !utilfeature.DefaultFeatureGate.Enabled(features.StableLoadBalancerNodeSet) {
return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...)
}
return false
}
// syncNodes handles updating the hosts pointed to by all load
// balancers whenever the set of nodes in the cluster changes.
func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String {
startTime := time.Now()
defer func() {
latency := time.Since(startTime).Seconds()
klog.V(4).Infof("It took %v seconds to finish syncNodes", latency)
nodeSyncLatency.Observe(latency)
}()
klog.V(2).Infof("Syncing backends for all LB services.")
servicesToUpdate := c.cache.allServices()
numServices := len(servicesToUpdate)
servicesToRetry := c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers)
klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
numServices-len(servicesToRetry), numServices)
return servicesToRetry
}
// nodeSyncService syncs the nodes for one load balancer type service. The return value
// indicates if we should retry. Hence, this functions returns false if we've updated
// load balancers and finished doing it successfully, or didn't try to at all because
// there's no need. This function returns true if we tried to update load balancers and
// failed, indicating to the caller that we should try again.
func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool {
const retSuccess = false
const retNeedRetry = true
if svc == nil || !wantsLoadBalancer(svc) {
return retSuccess
}
newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...)
oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...)
if nodeNames(newNodes).Equal(nodeNames(oldNodes)) {
return retSuccess
}
klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil {
runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err))
nodeSyncErrorCount.Inc()
return retNeedRetry
}
klog.V(4).Infof("nodeSyncService finished successfully for service %s/%s", svc.Namespace, svc.Name)
return retSuccess
}
// updateLoadBalancerHosts updates all existing load balancers so that
// they will match the latest list of nodes with input number of workers.
// Returns the list of services that couldn't be updated.
func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) {
klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers)
// Include all nodes and let nodeSyncService filter and figure out if
// the update is relevant for the service in question.
nodes, err := listWithPredicates(c.nodeLister)
if err != nil {
runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err))
return serviceKeys(services)
}
// lock for servicesToRetry
servicesToRetry = sets.NewString()
lock := sync.Mutex{}
doWork := func(piece int) {
if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry {
return
}
lock.Lock()
defer lock.Unlock()
key := fmt.Sprintf("%s/%s", services[piece].Namespace, services[piece].Name)
servicesToRetry.Insert(key)
}
workqueue.ParallelizeUntil(ctx, workers, len(services), doWork)
c.lastSyncedNodes = nodes
klog.V(4).Infof("Finished updateLoadBalancerHosts")
return servicesToRetry
}
// Updates the load balancer of a service, assuming we hold the mutex
// associated with the service.
func (c *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
startTime := time.Now()
loadBalancerSyncCount.Inc()
defer func() {
latency := time.Since(startTime).Seconds()
klog.V(4).Infof("It took %v seconds to update load balancer hosts for service %s/%s", latency, service.Namespace, service.Name)
updateLoadBalancerHostLatency.Observe(latency)
}()
klog.V(2).Infof("Updating backends for load balancer %s/%s with node set: %v", service.Namespace, service.Name, nodeNames(hosts))
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
err := c.balancer.UpdateLoadBalancer(context.TODO(), c.clusterName, service, hosts)
if err == nil {
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(hosts) == 0 {
c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer")
} else {
c.eventRecorder.Event(service, v1.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
}
return nil
}
if err == cloudprovider.ImplementedElsewhere {
// ImplementedElsewhere indicates that the UpdateLoadBalancer is a nop and the
// functionality is implemented by a different controller. In this case, we
// return immediately without doing anything.
return nil
}
// It's only an actual error if the load balancer still exists.
if _, exists, err := c.balancer.GetLoadBalancer(context.TODO(), c.clusterName, service); err != nil {
runtime.HandleError(fmt.Errorf("failed to check if load balancer exists for service %s/%s: %v", service.Namespace, service.Name, err))
} else if !exists {
return nil
}
c.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)
return err
}
func wantsLoadBalancer(service *v1.Service) bool {
// if LoadBalancerClass is set, the user does not want the default cloud-provider Load Balancer
return service.Spec.Type == v1.ServiceTypeLoadBalancer && service.Spec.LoadBalancerClass == nil
}
func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
}
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (c *Controller) syncService(ctx context.Context, key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// service holds the latest service info from apiserver
service, err := c.serviceLister.Services(namespace).Get(name)
switch {
case apierrors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
err = c.processServiceDeletion(ctx, key)
case err != nil:
runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err))
default:
// It is not safe to modify an object returned from an informer.
// As reconcilers may modify the service object we need to copy
// it first.
err = c.processServiceCreateOrUpdate(ctx, service.DeepCopy(), key)
}
return err
}
func (c *Controller) processServiceDeletion(ctx context.Context, key string) error {
cachedService, ok := c.cache.get(key)
if !ok {
// Cache does not contains the key means:
// - We didn't create a Load Balancer for the deleted service at all.
// - We already deleted the Load Balancer that was created for the service.
// In both cases we have nothing left to do.
return nil
}
klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key)
if err := c.processLoadBalancerDelete(ctx, cachedService.state, key); err != nil {
return err
}
c.cache.delete(key)
return nil
}
func (c *Controller) processLoadBalancerDelete(ctx context.Context, service *v1.Service, key string) error {
// delete load balancer info only if the service type is LoadBalancer
if !wantsLoadBalancer(service) {
return nil
}
c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := c.balancer.EnsureLoadBalancerDeleted(ctx, c.clusterName, service); err != nil {
c.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err)
return err
}
c.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
return nil
}
// addFinalizer patches the service to add finalizer.
func (c *Controller) addFinalizer(service *v1.Service) error {
if servicehelper.HasLBFinalizer(service) {
return nil
}
// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer)
klog.V(2).Infof("Adding finalizer to service %s/%s", updated.Namespace, updated.Name)
_, err := servicehelper.PatchService(c.kubeClient.CoreV1(), service, updated)
return err
}
// removeFinalizer patches the service to remove finalizer.
func (c *Controller) removeFinalizer(service *v1.Service) error {
if !servicehelper.HasLBFinalizer(service) {
return nil
}
// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.ObjectMeta.Finalizers = removeString(updated.ObjectMeta.Finalizers, servicehelper.LoadBalancerCleanupFinalizer)
klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name)
_, err := servicehelper.PatchService(c.kubeClient.CoreV1(), service, updated)
return err
}
// removeString returns a newly created []string that contains all items from slice that
// are not equal to s.
func removeString(slice []string, s string) []string {
var newSlice []string
for _, item := range slice {
if item != s {
newSlice = append(newSlice, item)
}
}
return newSlice
}
// patchStatus patches the service with the given LoadBalancerStatus.
func (c *Controller) patchStatus(service *v1.Service, previousStatus, newStatus *v1.LoadBalancerStatus) error {
if servicehelper.LoadBalancerStatusEqual(previousStatus, newStatus) {
return nil
}
// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.Status.LoadBalancer = *newStatus
klog.V(2).Infof("Patching status for service %s/%s", updated.Namespace, updated.Name)
_, err := servicehelper.PatchService(c.kubeClient.CoreV1(), service, updated)
return err
}
// NodeConditionPredicate is a function that indicates whether the given node's conditions meet
// some set of criteria defined by the function.
type NodeConditionPredicate func(node *v1.Node) bool
var (
allNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{
nodeIncludedPredicate,
nodeUnTaintedPredicate,
nodeReadyPredicate,
nodeHasProviderIDPredicate,
}
etpLocalNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{
nodeIncludedPredicate,
nodeUnTaintedPredicate,
nodeHasProviderIDPredicate,
}
stableNodeSetPredicates []NodeConditionPredicate = []NodeConditionPredicate{
nodeNotDeletedPredicate,
nodeIncludedPredicate,
nodeHasProviderIDPredicate,
// This is not perfect, but probably good enough. We won't update the
// LBs just because the taint was added (see shouldSyncUpdatedNode) but
// if any other situation causes an LB sync, tainted nodes will be
// excluded at that time and cause connections on said node to not
// connection drain.
nodeUnTaintedPredicate,
}
)
func getNodePredicatesForService(service *v1.Service) []NodeConditionPredicate {
if utilfeature.DefaultFeatureGate.Enabled(features.StableLoadBalancerNodeSet) {
return stableNodeSetPredicates
}
if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal {
return etpLocalNodePredicates
}
return allNodePredicates
}
// We consider the node for load balancing only when the node is not labelled for exclusion.
func nodeIncludedPredicate(node *v1.Node) bool {
_, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]
return !hasExcludeBalancerLabel
}
func nodeHasProviderIDPredicate(node *v1.Node) bool {
return node.Spec.ProviderID != ""
}