diff --git a/pkg/nodemanager/BUILD.bazel b/pkg/nodemanager/BUILD.bazel index ae977605ce..6112056775 100644 --- a/pkg/nodemanager/BUILD.bazel +++ b/pkg/nodemanager/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", @@ -19,6 +20,7 @@ go_library( "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/util/retry:go_default_library", "//vendor/k8s.io/cloud-provider:go_default_library", + "//vendor/k8s.io/cloud-provider/node/helpers:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/kubernetes/pkg/kubelet/apis:go_default_library", "//vendor/k8s.io/kubernetes/pkg/scheduler/api:go_default_library", diff --git a/pkg/nodemanager/nodemanager.go b/pkg/nodemanager/nodemanager.go index 4e345a12cb..fc0688b5a3 100644 --- a/pkg/nodemanager/nodemanager.go +++ b/pkg/nodemanager/nodemanager.go @@ -24,7 +24,9 @@ import ( "time" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -36,19 +38,13 @@ import ( "k8s.io/client-go/tools/record" clientretry "k8s.io/client-go/util/retry" cloudprovider "k8s.io/cloud-provider" + cloudnodeutil "k8s.io/cloud-provider/node/helpers" "k8s.io/klog" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" nodeutil "k8s.io/kubernetes/pkg/util/node" ) -// UpdateNodeSpecBackoff defines the backoff for node spec updates. -var UpdateNodeSpecBackoff = wait.Backoff{ - Steps: 20, - Duration: 50 * time.Millisecond, - Jitter: 1.0, -} - // NodeProvider defines the interfaces for node provider. type NodeProvider interface { // NodeAddresses returns the addresses of the specified instance. @@ -61,6 +57,50 @@ type NodeProvider interface { GetZone(ctx context.Context) (cloudprovider.Zone, error) } +// labelReconcileInfo lists Node labels to reconcile, and how to reconcile them. +// primaryKey and secondaryKey are keys of labels to reconcile. +// - If both keys exist, but their values don't match. Use the value from the +// primaryKey as the source of truth to reconcile. +// - If ensureSecondaryExists is true, and the secondaryKey does not +// exist, secondaryKey will be added with the value of the primaryKey. +var labelReconcileInfo = []struct { + primaryKey string + secondaryKey string + ensureSecondaryExists bool +}{ + { + // Reconcile the beta and the GA zone label using the beta label as + // the source of truth + // TODO: switch the primary key to GA labels in v1.21 + primaryKey: v1.LabelZoneFailureDomain, + secondaryKey: v1.LabelZoneFailureDomainStable, + ensureSecondaryExists: true, + }, + { + // Reconcile the beta and the stable region label using the beta label as + // the source of truth + // TODO: switch the primary key to GA labels in v1.21 + primaryKey: v1.LabelZoneRegion, + secondaryKey: v1.LabelZoneRegionStable, + ensureSecondaryExists: true, + }, + { + // Reconcile the beta and the stable instance-type label using the beta label as + // the source of truth + // TODO: switch the primary key to GA labels in v1.21 + primaryKey: v1.LabelInstanceType, + secondaryKey: v1.LabelInstanceTypeStable, + ensureSecondaryExists: true, + }, +} + +// UpdateNodeSpecBackoff is the back configure for node update. +var UpdateNodeSpecBackoff = wait.Backoff{ + Steps: 20, + Duration: 50 * time.Millisecond, + Jitter: 1.0, +} + // CloudNodeController reconciles node information. type CloudNodeController struct { nodeName string @@ -102,8 +142,8 @@ func NewCloudNodeController( // Use shared informer to listen to add/update of nodes. Note that any nodes // that exist before node controller starts will show up in the update method cnc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: cnc.AddCloudNode, - UpdateFunc: cnc.UpdateCloudNode, + AddFunc: func(obj interface{}) { cnc.AddCloudNode(context.TODO(), obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { cnc.UpdateCloudNode(context.TODO(), oldObj, newObj) }, }) return cnc @@ -120,24 +160,84 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) { // very infrequently. DO NOT MODIFY this to perform frequent operations. // Start a loop to periodically update the node addresses obtained from the cloud - wait.Until(cnc.UpdateNodeStatus, cnc.nodeStatusUpdateFrequency, stopCh) + wait.Until(func() { cnc.UpdateNodeStatus(context.TODO()) }, cnc.nodeStatusUpdateFrequency, stopCh) } // UpdateNodeStatus updates the node status, such as node addresses -func (cnc *CloudNodeController) UpdateNodeStatus() { - nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"}) +func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) { + nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ + ResourceVersion: "0", + FieldSelector: fields.OneTermEqualSelector("metadata.name", cnc.nodeName).String(), + }) if err != nil { klog.Errorf("Error monitoring node status: %v", err) return } for i := range nodes.Items { - cnc.updateNodeAddress(&nodes.Items[i]) + cnc.updateNodeAddress(ctx, &nodes.Items[i]) + } + + for _, node := range nodes.Items { + err = cnc.reconcileNodeLabels(node.Name) + if err != nil { + klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err) + } + } +} + +// reconcileNodeLabels reconciles node labels transitioning from beta to GA +func (cnc *CloudNodeController) reconcileNodeLabels(nodeName string) error { + node, err := cnc.nodeInformer.Lister().Get(nodeName) + if err != nil { + // If node not found, just ignore it. + if apierrors.IsNotFound(err) { + return nil + } + + return err + } + + if node.Labels == nil { + // Nothing to reconcile. + return nil + } + + labelsToUpdate := map[string]string{} + for _, r := range labelReconcileInfo { + primaryValue, primaryExists := node.Labels[r.primaryKey] + secondaryValue, secondaryExists := node.Labels[r.secondaryKey] + + if !primaryExists { + // The primary label key does not exist. This should not happen + // within our supported version skew range, when no external + // components/factors modifying the node object. Ignore this case. + continue + } + if secondaryExists && primaryValue != secondaryValue { + // Secondary label exists, but not consistent with the primary + // label. Need to reconcile. + labelsToUpdate[r.secondaryKey] = primaryValue + + } else if !secondaryExists && r.ensureSecondaryExists { + // Apply secondary label based on primary label. + labelsToUpdate[r.secondaryKey] = primaryValue + } + } + + if len(labelsToUpdate) == 0 { + return nil } + + if !cloudnodeutil.AddOrUpdateLabelsOnNode(cnc.kubeClient, labelsToUpdate, node) { + return fmt.Errorf("failed update labels for node %+v", node) + } + + return nil } // UpdateNodeAddress updates the nodeAddress of a single node -func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node) { +func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.Node) { // Do not process nodes that are still tainted cloudTaint := getCloudTaint(node.Spec.Taints) if cloudTaint != nil { @@ -145,10 +245,19 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node) { return } - // TODO(feiskyer): Node that isn't present according to the cloud provider shouldn't have its address updated - nodeAddresses, err := cnc.nodeProvider.NodeAddresses(context.TODO(), types.NodeName(node.Name)) + // Node that isn't present according to the cloud provider shouldn't have its address updated + exists, err := cnc.ensureNodeExistsByProviderID(ctx, node) if err != nil { + // Continue to update node address when not sure the node is not exists klog.Errorf("%v", err) + } else if !exists { + klog.V(4).Infof("The node %s is no longer present according to the cloud provider, do not process.", node.Name) + return + } + + nodeAddresses, err := cnc.getNodeAddressesByName(ctx, node) + if err != nil { + klog.Errorf("Error getting node addresses for node %q: %v", node.Name, err) return } @@ -162,6 +271,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node) { for i := range nodeAddresses { if nodeAddresses[i].Type == v1.NodeHostName { hostnameExists = true + break } } // If hostname was not present in cloud provided addresses, use the hostname @@ -177,7 +287,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node) { // it can be found in the cloud as well (consistent with the behaviour in kubelet) if nodeIP, ok := ensureNodeProvidedIPExists(node, nodeAddresses); ok { if nodeIP == nil { - klog.Errorf("Specified Node IP not found in cloudprovider") + klog.Errorf("Specified Node IP not found in cloudprovider for node %q", node.Name) return } } @@ -192,8 +302,12 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node) { } } +// nodeModifier is used to carry changes to node objects across multiple attempts to update them +// in a retry-if-conflict loop. +type nodeModifier func(*v1.Node) + // UpdateCloudNode handles node update event. -func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) { +func (cnc *CloudNodeController) UpdateCloudNode(ctx context.Context, _, newObj interface{}) { node, ok := newObj.(*v1.Node) if !ok { utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) @@ -211,11 +325,11 @@ func (cnc *CloudNodeController) UpdateCloudNode(_, newObj interface{}) { return } - cnc.initializeNode(node) + cnc.initializeNode(ctx, node) } // AddCloudNode handles initializing new nodes registered with the cloud taint. -func (cnc *CloudNodeController) AddCloudNode(obj interface{}) { +func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{}) { node := obj.(*v1.Node) // Skip other nodes other than cnc.nodeName. @@ -229,79 +343,53 @@ func (cnc *CloudNodeController) AddCloudNode(obj interface{}) { return } - cnc.initializeNode(node) + cnc.initializeNode(ctx, node) } // This processes nodes that were added into the cluster, and cloud initialize them if appropriate -func (cnc *CloudNodeController) initializeNode(node *v1.Node) { - err := clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error { - curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) - if err != nil { - return err - } - - cloudTaint := getCloudTaint(curNode.Spec.Taints) - if cloudTaint == nil { - // Node object received from event had the cloud taint but was outdated, - // the node has actually already been initialized. - return nil - } +func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) { + klog.Infof("Initializing node %s with cloud provider", node.Name) + curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to get node %s: %v", node.Name, err)) + return + } - if curNode.Spec.ProviderID == "" { - providerID, err := cnc.nodeProvider.InstanceID(context.TODO(), types.NodeName(curNode.Name)) - if err == nil { - curNode.Spec.ProviderID = providerID - } else { - // we should attempt to set providerID on curNode, but - // we can continue if we fail since we will attempt to set - // node addresses given the node name in getNodeAddressesByProviderIDOrName - klog.Errorf("failed to set node provider id: %v", err) - } - } + cloudTaint := getCloudTaint(curNode.Spec.Taints) + if cloudTaint == nil { + // Node object received from event had the cloud taint but was outdated, + // the node has actually already been initialized. + return + } - nodeAddresses, err := cnc.nodeProvider.NodeAddresses(context.TODO(), types.NodeName(curNode.Name)) - if err != nil { - return err - } + nodeModifiers, err := cnc.getNodeModifiersFromCloudProvider(ctx, curNode) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to initialize node %s at cloudprovider: %v", node.Name, err)) + return + } - // If user provided an IP address, ensure that IP address is found - // in the cloud provider before removing the taint on the node - if nodeIP, ok := ensureNodeProvidedIPExists(curNode, nodeAddresses); ok { - if nodeIP == nil { - return errors.New("failed to find kubelet node IP from cloud provider") - } - } + nodeModifiers = append(nodeModifiers, func(n *v1.Node) { + n.Spec.Taints = excludeCloudTaint(n.Spec.Taints) + }) - if instanceType, err := cnc.nodeProvider.InstanceType(context.TODO(), types.NodeName(node.Name)); err != nil { + err = clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error { + curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + if err != nil { return err - } else if instanceType != "" { - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType) - curNode.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType } - // Set zones for the node. - zone, err := cnc.nodeProvider.GetZone(context.TODO()) - if err != nil { - return fmt.Errorf("failed to get zone from cloud provider: %v", err) - } - if zone.FailureDomain != "" { - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain) - curNode.ObjectMeta.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain + for _, modify := range nodeModifiers { + modify(curNode) } - if zone.Region != "" { - klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region) - curNode.ObjectMeta.Labels[v1.LabelZoneRegion] = zone.Region - } - - curNode.Spec.Taints = excludeCloudTaint(curNode.Spec.Taints) _, err = cnc.kubeClient.CoreV1().Nodes().Update(curNode) if err != nil { return err } + // After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses // So that users do not see any significant delay in IP addresses being filled into the node - cnc.updateNodeAddress(curNode) + cnc.updateNodeAddress(ctx, curNode) klog.Infof("Successfully initialized node %s with cloud provider", node.Name) return nil @@ -312,6 +400,86 @@ func (cnc *CloudNodeController) initializeNode(node *v1.Node) { } } +// getNodeModifiersFromCloudProvider returns a slice of nodeModifiers that update +// a node object with provider-specific information. +// All of the returned functions are idempotent, because they are used in a retry-if-conflict +// loop, meaning they could get called multiple times. +func (cnc *CloudNodeController) getNodeModifiersFromCloudProvider(ctx context.Context, node *v1.Node) ([]nodeModifier, error) { + var nodeModifiers []nodeModifier + + if node.Spec.ProviderID == "" { + providerID, err := cnc.nodeProvider.InstanceID(ctx, types.NodeName(node.Name)) + if err == nil { + nodeModifiers = append(nodeModifiers, func(n *v1.Node) { + if n.Spec.ProviderID == "" { + n.Spec.ProviderID = providerID + } + }) + } else { + // we should attempt to set providerID on node, but + // we can continue if we fail since we will attempt to set + // node addresses given the node name in getNodeAddressesByName + klog.Errorf("failed to set node provider id: %v", err) + } + } + + nodeAddresses, err := cnc.getNodeAddressesByName(ctx, node) + if err != nil { + return nil, err + } + + // If user provided an IP address, ensure that IP address is found + // in the cloud provider before removing the taint on the node + if nodeIP, ok := ensureNodeProvidedIPExists(node, nodeAddresses); ok { + if nodeIP == nil { + return nil, errors.New("failed to find kubelet node IP from cloud provider") + } + } + + if instanceType, err := cnc.getInstanceTypeByName(ctx, node); err != nil { + return nil, err + } else if instanceType != "" { + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType) + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceTypeStable, instanceType) + nodeModifiers = append(nodeModifiers, func(n *v1.Node) { + if n.Labels == nil { + n.Labels = map[string]string{} + } + n.Labels[v1.LabelInstanceType] = instanceType + n.Labels[v1.LabelInstanceTypeStable] = instanceType + }) + } + + zone, err := cnc.getZoneByName(ctx, node) + if err != nil { + return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err) + } + if zone.FailureDomain != "" { + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain) + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomainStable, zone.FailureDomain) + nodeModifiers = append(nodeModifiers, func(n *v1.Node) { + if n.Labels == nil { + n.Labels = map[string]string{} + } + n.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain + n.Labels[v1.LabelZoneFailureDomainStable] = zone.FailureDomain + }) + } + if zone.Region != "" { + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region) + klog.V(2).Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegionStable, zone.Region) + nodeModifiers = append(nodeModifiers, func(n *v1.Node) { + if n.Labels == nil { + n.Labels = map[string]string{} + } + n.Labels[v1.LabelZoneRegion] = zone.Region + n.Labels[v1.LabelZoneRegionStable] = zone.Region + }) + } + + return nodeModifiers, nil +} + func getCloudTaint(taints []v1.Taint) *v1.Taint { for _, taint := range taints { if taint.Key == schedulerapi.TaintExternalCloudProvider { @@ -334,11 +502,11 @@ func excludeCloudTaint(taints []v1.Taint) []v1.Taint { // ensureNodeExistsByProviderID checks if the instance exists by the provider id, // If provider id in spec is empty it calls instanceId with node name to get provider id -func ensureNodeExistsByProviderID(instances cloudprovider.Instances, node *v1.Node) (bool, error) { +func (cnc *CloudNodeController) ensureNodeExistsByProviderID(ctx context.Context, node *v1.Node) (bool, error) { providerID := node.Spec.ProviderID if providerID == "" { var err error - providerID, err = instances.InstanceID(context.TODO(), types.NodeName(node.Name)) + providerID, err = cnc.nodeProvider.InstanceID(ctx, types.NodeName(node.Name)) if err != nil { if err == cloudprovider.InstanceNotFound { return false, nil @@ -352,7 +520,17 @@ func ensureNodeExistsByProviderID(instances cloudprovider.Instances, node *v1.No } } - return instances.InstanceExistsByProviderID(context.TODO(), providerID) + return true, nil +} + +func (cnc *CloudNodeController) getNodeAddressesByName(ctx context.Context, node *v1.Node) ([]v1.NodeAddress, error) { + nodeAddresses, err := cnc.nodeProvider.NodeAddresses(ctx, types.NodeName(node.Name)) + if err != nil { + if err != nil { + return nil, fmt.Errorf("error fetching node by name %s: %v", node.Name, err) + } + } + return nodeAddresses, nil } func nodeAddressesChangeDetected(addressSet1, addressSet2 []v1.NodeAddress) bool { @@ -387,3 +565,26 @@ func ensureNodeProvidedIPExists(node *v1.Node, nodeAddresses []v1.NodeAddress) ( } return nodeIP, nodeIPExists } + +func (cnc *CloudNodeController) getInstanceTypeByName(ctx context.Context, node *v1.Node) (string, error) { + instanceType, err := cnc.nodeProvider.InstanceType(ctx, types.NodeName(node.Name)) + if err != nil { + if err != nil { + return "", fmt.Errorf("InstanceType: Error fetching by NodeName %s: %v", node.Name, err) + } + } + return instanceType, err +} + +// getZoneByName will attempt to get the zone of node using its providerID +// then it's name. If both attempts fail, an error is returned +func (cnc *CloudNodeController) getZoneByName(ctx context.Context, node *v1.Node) (cloudprovider.Zone, error) { + zone, err := cnc.nodeProvider.GetZone(ctx) + if err != nil { + if err != nil { + return cloudprovider.Zone{}, fmt.Errorf("Zone: Error fetching by NodeName %s: %v", node.Name, err) + } + } + + return zone, nil +}