From d9a731bd815d555c93d10f4b34e65bfeaa99cb76 Mon Sep 17 00:00:00 2001 From: Jonathan Innis Date: Tue, 4 Apr 2023 14:21:08 -0700 Subject: [PATCH] Revert "Revert machine migration changes (#176) (#241)" This reverts commit 9973eac0c293f6c4d9c34fcd696b5f790ca1967e. --- .../templates/karpenter.sh_machines.yaml | 1 + go.mod | 2 +- go.sum | 4 +- pkg/cloudprovider/fake/cloudprovider.go | 3 +- pkg/controllers/consistency/controller.go | 38 ++- pkg/controllers/consistency/events.go | 10 +- pkg/controllers/consistency/failedinit.go | 46 +-- pkg/controllers/consistency/nodeshape.go | 39 +-- pkg/controllers/consistency/suite_test.go | 18 +- pkg/controllers/consistency/termination.go | 5 +- pkg/controllers/controllers.go | 6 +- .../deprovisioning/consolidation.go | 14 +- pkg/controllers/deprovisioning/controller.go | 32 +-- pkg/controllers/deprovisioning/drift.go | 2 +- pkg/controllers/deprovisioning/drift_test.go | 40 ++- pkg/controllers/deprovisioning/emptiness.go | 2 +- .../deprovisioning/events/events.go | 49 +++- .../deprovisioning/expiration_test.go | 34 ++- pkg/controllers/deprovisioning/helpers.go | 6 +- pkg/controllers/deprovisioning/metrics.go | 4 +- pkg/controllers/deprovisioning/suite_test.go | 237 +++++++++------ pkg/controllers/deprovisioning/types.go | 3 + .../machine/lifecycle/registration.go | 3 + pkg/controllers/node/controller.go | 27 +- pkg/controllers/node/drift.go | 3 +- pkg/controllers/node/emptiness.go | 5 - pkg/controllers/node/finalizer.go | 49 ---- pkg/controllers/node/initialization.go | 125 -------- pkg/controllers/node/suite_test.go | 270 +----------------- pkg/controllers/provisioning/provisioner.go | 57 +--- .../provisioning/scheduling/events/events.go | 24 +- .../provisioning/scheduling/scheduler.go | 2 +- .../provisioning/scheduling/suite_test.go | 19 +- pkg/controllers/state/cluster.go | 12 + pkg/controllers/state/suite_test.go | 2 - pkg/controllers/termination/controller.go | 5 + pkg/events/suite_test.go | 16 +- pkg/metrics/metrics.go | 45 ++- pkg/operator/operator.go | 7 + pkg/test/expectations/expectations.go | 24 +- 40 files changed, 492 insertions(+), 798 deletions(-) create mode 120000 charts/karpenter-core-crd/templates/karpenter.sh_machines.yaml delete mode 100644 pkg/controllers/node/finalizer.go delete mode 100644 pkg/controllers/node/initialization.go diff --git a/charts/karpenter-core-crd/templates/karpenter.sh_machines.yaml b/charts/karpenter-core-crd/templates/karpenter.sh_machines.yaml new file mode 120000 index 0000000000..10e93ba5ee --- /dev/null +++ b/charts/karpenter-core-crd/templates/karpenter.sh_machines.yaml @@ -0,0 +1 @@ +../../pkg/apis/crds/karpenter.sh_machines.yaml \ No newline at end of file diff --git a/go.mod b/go.mod index 10e0ce9e3c..6f6b9b9168 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( k8s.io/csi-translation-lib v0.25.4 k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a - sigs.k8s.io/controller-runtime v0.13.1 + sigs.k8s.io/controller-runtime v0.13.0 ) require ( diff --git a/go.sum b/go.sum index 265509a068..5bcbd954d2 100644 --- a/go.sum +++ b/go.sum @@ -850,8 +850,8 @@ knative.dev/pkg v0.0.0-20221123154742-05b694ec4d3a/go.mod h1:fckNBPf9bu5/p1RbnOh rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/controller-runtime v0.13.1 h1:tUsRCSJVM1QQOOeViGeX3GMT3dQF1eePPw6sEE3xSlg= -sigs.k8s.io/controller-runtime v0.13.1/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI= +sigs.k8s.io/controller-runtime v0.13.0 h1:iqa5RNciy7ADWnIc8QxCbOX5FEKVR3uxVxKHRMc2WIQ= +sigs.k8s.io/controller-runtime v0.13.0/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index 0ee5ab18e1..3396073faa 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -115,10 +115,9 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) ( break } } - name := test.RandomName() created := &v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: machine.Name, Labels: lo.Assign(labels, machine.Labels), Annotations: machine.Annotations, }, diff --git a/pkg/controllers/consistency/controller.go b/pkg/controllers/consistency/controller.go index 0a535d37b0..3eaabea498 100644 --- a/pkg/controllers/consistency/controller.go +++ b/pkg/controllers/consistency/controller.go @@ -30,14 +30,16 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" "github.com/aws/karpenter-core/pkg/events" corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" + machineutil "github.com/aws/karpenter-core/pkg/utils/machine" ) -var _ corecontroller.TypedController[*v1.Node] = (*Controller)(nil) +var _ corecontroller.TypedController[*v1alpha5.Machine] = (*Controller)(nil) type Controller struct { clock clock.Clock @@ -52,7 +54,7 @@ type Issue string type Check interface { // Check performs the consistency check, this should return a list of slice discovered, or an empty // slice if no issues were found - Check(context.Context, *v1.Node) ([]Issue, error) + Check(context.Context, *v1.Node, *v1alpha5.Machine) ([]Issue, error) } // scanPeriod is how often we inspect and report issues that are found. @@ -61,15 +63,15 @@ const scanPeriod = 10 * time.Minute func NewController(clk clock.Clock, kubeClient client.Client, recorder events.Recorder, provider cloudprovider.CloudProvider) corecontroller.Controller { - return corecontroller.Typed[*v1.Node](kubeClient, &Controller{ + return corecontroller.Typed[*v1alpha5.Machine](kubeClient, &Controller{ clock: clk, kubeClient: kubeClient, recorder: recorder, lastScanned: cache.New(scanPeriod, 1*time.Minute), checks: []Check{ - NewFailedInit(clk, kubeClient, provider), + NewFailedInit(clk, provider), NewTermination(kubeClient), - NewNodeShape(kubeClient, provider), + NewNodeShape(provider), }}, ) } @@ -78,12 +80,12 @@ func (c *Controller) Name() string { return "consistency" } -func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Result, error) { - if node.Labels[v1alpha5.ProvisionerNameLabelKey] == "" { +func (c *Controller) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) { + if machine.Status.ProviderID == "" { return reconcile.Result{}, nil } - // If we get an event before we should check for consistency checks, we ignore and wait - if lastTime, ok := c.lastScanned.Get(client.ObjectKeyFromObject(node).String()); ok { + // If we get an event before we should check for inflight checks, we ignore and wait + if lastTime, ok := c.lastScanned.Get(client.ObjectKeyFromObject(machine).String()); ok { if lastTime, ok := lastTime.(time.Time); ok { remaining := scanPeriod - c.clock.Since(lastTime) return reconcile.Result{RequeueAfter: remaining}, nil @@ -91,26 +93,34 @@ func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Re // the above should always succeed return reconcile.Result{RequeueAfter: scanPeriod}, nil } - c.lastScanned.SetDefault(client.ObjectKeyFromObject(node).String(), c.clock.Now()) + c.lastScanned.SetDefault(client.ObjectKeyFromObject(machine).String(), c.clock.Now()) + node, err := machineutil.NodeForMachine(ctx, c.kubeClient, machine) + if err != nil { + return reconcile.Result{}, machineutil.IgnoreDuplicateNodeError(machineutil.IgnoreNodeNotFoundError(err)) + } for _, check := range c.checks { - issues, err := check.Check(ctx, node) + issues, err := check.Check(ctx, node, machine) if err != nil { return reconcile.Result{}, fmt.Errorf("checking node with %T, %w", check, err) } for _, issue := range issues { logging.FromContext(ctx).Errorf("check failed, %s", issue) consistencyErrors.With(prometheus.Labels{checkLabel: reflect.TypeOf(check).Elem().Name()}).Inc() - c.recorder.Publish(CheckEvent(node, string(issue))...) + c.recorder.Publish(CheckEvent(node, machine, string(issue))...) } } return reconcile.Result{RequeueAfter: scanPeriod}, nil } -func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontroller.Builder { +func (c *Controller) Builder(ctx context.Context, m manager.Manager) corecontroller.Builder { return corecontroller.Adapt(controllerruntime. NewControllerManagedBy(m). - For(&v1.Node{}). + For(&v1alpha5.Machine{}). + Watches( + &source.Kind{Type: &v1.Node{}}, + machineutil.NodeEventHandler(ctx, c.kubeClient), + ). WithOptions(controller.Options{MaxConcurrentReconciles: 10}), ) } diff --git a/pkg/controllers/consistency/events.go b/pkg/controllers/consistency/events.go index 011799252f..5526068290 100644 --- a/pkg/controllers/consistency/events.go +++ b/pkg/controllers/consistency/events.go @@ -17,10 +17,11 @@ package consistency import ( v1 "k8s.io/api/core/v1" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" ) -func CheckEvent(node *v1.Node, message string) []events.Event { +func CheckEvent(node *v1.Node, machine *v1alpha5.Machine, message string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -29,5 +30,12 @@ func CheckEvent(node *v1.Node, message string) []events.Event { Message: message, DedupeValues: []string{node.Name, message}, }, + { + InvolvedObject: machine, + Type: v1.EventTypeWarning, + Reason: "FailedConsistencyCheck", + Message: message, + DedupeValues: []string{machine.Name, message}, + }, } } diff --git a/pkg/controllers/consistency/failedinit.go b/pkg/controllers/consistency/failedinit.go index b9be389a09..ba8e7a3a9a 100644 --- a/pkg/controllers/consistency/failedinit.go +++ b/pkg/controllers/consistency/failedinit.go @@ -19,15 +19,12 @@ import ( "fmt" "time" - "github.com/samber/lo" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/utils/clock" - "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" - "github.com/aws/karpenter-core/pkg/controllers/node" + "github.com/aws/karpenter-core/pkg/controllers/machine/lifecycle" ) // initFailureTime is the time after which we start reporting a node as having failed to initialize. This is set @@ -37,51 +34,34 @@ const initFailureTime = time.Hour // FailedInit detects nodes that fail to initialize within an hour and reports the reason for the initialization // failure type FailedInit struct { - clock clock.Clock - kubeClient client.Client - provider cloudprovider.CloudProvider + clock clock.Clock + provider cloudprovider.CloudProvider } -func NewFailedInit(clk clock.Clock, kubeClient client.Client, provider cloudprovider.CloudProvider) Check { - return &FailedInit{clock: clk, kubeClient: kubeClient, provider: provider} +func NewFailedInit(clk clock.Clock, provider cloudprovider.CloudProvider) Check { + return &FailedInit{clock: clk, provider: provider} } -func (f FailedInit) Check(ctx context.Context, n *v1.Node) ([]Issue, error) { - // ignore nodes that are deleting - if !n.DeletionTimestamp.IsZero() { +func (f FailedInit) Check(_ context.Context, node *v1.Node, m *v1alpha5.Machine) ([]Issue, error) { + // ignore machines that are deleting + if !m.DeletionTimestamp.IsZero() { return nil, nil } - - nodeAge := f.clock.Since(n.CreationTimestamp.Time) - // n is already initialized or not old enough - if n.Labels[v1alpha5.LabelNodeInitialized] == "true" || nodeAge < initFailureTime { + // machine is already initialized or isn't old enough + if m.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() || + f.clock.Now().Before(m.CreationTimestamp.Time.Add(initFailureTime)) { return nil, nil } - provisioner := &v1alpha5.Provisioner{} - if err := f.kubeClient.Get(ctx, types.NamespacedName{Name: n.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil { - // provisioner is missing, node should be removed soon - return nil, client.IgnoreNotFound(err) - } - instanceTypes, err := f.provider.GetInstanceTypes(ctx, provisioner) - if err != nil { - return nil, err - } - instanceType, ok := lo.Find(instanceTypes, func(it *cloudprovider.InstanceType) bool { return it.Name == n.Labels[v1.LabelInstanceTypeStable] }) - if !ok { - return []Issue{Issue(fmt.Sprintf("instance type %q not found", n.Labels[v1.LabelInstanceTypeStable]))}, nil - } // detect startup taints which should be removed var result []Issue - if taint, ok := node.IsStartupTaintRemoved(n, provisioner); !ok { + if taint, ok := lifecycle.IsStartupTaintRemoved(node, m); !ok { result = append(result, Issue(fmt.Sprintf("startup taint %q is still on the node", formatTaint(taint)))) } - // and extended resources which never registered - if resource, ok := node.IsExtendedResourceRegistered(n, instanceType); !ok { + if resource, ok := lifecycle.RequestedResourcesRegistered(node, m); !ok { result = append(result, Issue(fmt.Sprintf("expected resource %q didn't register on the node", resource))) } - return result, nil } diff --git a/pkg/controllers/consistency/nodeshape.go b/pkg/controllers/consistency/nodeshape.go index 464daf331d..a81ff17f18 100644 --- a/pkg/controllers/consistency/nodeshape.go +++ b/pkg/controllers/consistency/nodeshape.go @@ -18,10 +18,7 @@ import ( "context" "fmt" - "github.com/samber/lo" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" @@ -29,44 +26,29 @@ import ( // NodeShape detects nodes that have launched with 10% or less of any resource than was expected. type NodeShape struct { - kubeClient client.Client - provider cloudprovider.CloudProvider + provider cloudprovider.CloudProvider } -func NewNodeShape(kubeClient client.Client, provider cloudprovider.CloudProvider) Check { +func NewNodeShape(provider cloudprovider.CloudProvider) Check { return &NodeShape{ - kubeClient: kubeClient, - provider: provider, + provider: provider, } } -func (n *NodeShape) Check(ctx context.Context, node *v1.Node) ([]Issue, error) { - // ignore nodes that are deleting - if !node.DeletionTimestamp.IsZero() { +func (n *NodeShape) Check(ctx context.Context, node *v1.Node, machine *v1alpha5.Machine) ([]Issue, error) { + // ignore machines that are deleting + if !machine.DeletionTimestamp.IsZero() { return nil, nil } - // and nodes that haven't initialized yet - if node.Labels[v1alpha5.LabelNodeInitialized] != "true" { + // and machines that haven't initialized yet + if machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() { return nil, nil } - provisioner := &v1alpha5.Provisioner{} - if err := n.kubeClient.Get(ctx, types.NamespacedName{Name: node.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil { - // provisioner is missing, node should be removed soon - return nil, client.IgnoreNotFound(err) - } - instanceTypes, err := n.provider.GetInstanceTypes(ctx, provisioner) - if err != nil { - return nil, err - } - instanceType, ok := lo.Find(instanceTypes, func(it *cloudprovider.InstanceType) bool { return it.Name == node.Labels[v1.LabelInstanceTypeStable] }) - if !ok { - return []Issue{Issue(fmt.Sprintf("instance type %q not found", node.Labels[v1.LabelInstanceTypeStable]))}, nil - } var issues []Issue - for resourceName, expectedQuantity := range instanceType.Capacity { + for resourceName, expectedQuantity := range machine.Status.Capacity { nodeQuantity, ok := node.Status.Capacity[resourceName] if !ok && !expectedQuantity.IsZero() { - issues = append(issues, Issue(fmt.Sprintf("expected resource %s not found", resourceName))) + issues = append(issues, Issue(fmt.Sprintf("expected resource \"%s\" not found", resourceName))) continue } @@ -75,6 +57,7 @@ func (n *NodeShape) Check(ctx context.Context, node *v1.Node) ([]Issue, error) { issues = append(issues, Issue(fmt.Sprintf("expected %s of resource %s, but found %s (%0.1f%% of expected)", expectedQuantity.String(), resourceName, nodeQuantity.String(), pct*100))) } + } return issues, nil } diff --git a/pkg/controllers/consistency/suite_test.go b/pkg/controllers/consistency/suite_test.go index 706b4a6b63..de480e2bd7 100644 --- a/pkg/controllers/consistency/suite_test.go +++ b/pkg/controllers/consistency/suite_test.go @@ -124,16 +124,10 @@ var _ = Describe("Controller", func() { } ExpectApplied(ctx, env.Client, provisioner, machine, node) fakeClock.Step(2 * time.Hour) - ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(machine)) Expect(recorder.DetectedEvent("expected resource \"fake.com/vendor-a\" didn't register on the node")).To(BeTrue()) }) It("should detect issues with nodes that have a startup taint which isn't removed", func() { - provisioner.Spec.StartupTaints = []v1.Taint{ - { - Key: "my.startup.taint", - Effect: v1.TaintEffectNoSchedule, - }, - } machine, node := test.MachineAndNode(v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -160,7 +154,7 @@ var _ = Describe("Controller", func() { }) ExpectApplied(ctx, env.Client, provisioner, machine, node) fakeClock.Step(2 * time.Hour) - ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(machine)) Expect(recorder.DetectedEvent("startup taint \"my.startup.taint:NoSchedule\" is still on the node")).To(BeTrue()) }) }) @@ -188,12 +182,12 @@ var _ = Describe("Controller", func() { Labels: podsLabels, MaxUnavailable: &intstr.IntOrString{IntVal: 0, Type: intstr.Int}, }) - node.Finalizers = []string{"prevent.deletion/now"} + machine.Finalizers = []string{"prevent.deletion/now"} p := test.Pod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{Labels: podsLabels}}) ExpectApplied(ctx, env.Client, provisioner, machine, node, p, pdb) ExpectManualBinding(ctx, env.Client, p, node) - _ = env.Client.Delete(ctx, node) - ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(node)) + _ = env.Client.Delete(ctx, machine) + ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(machine)) Expect(recorder.DetectedEvent(fmt.Sprintf("can't drain node, PDB %s/%s is blocking evictions", pdb.Namespace, pdb.Name))).To(BeTrue()) }) }) @@ -223,7 +217,7 @@ var _ = Describe("Controller", func() { v1.ResourcePods: resource.MustParse("10"), } ExpectApplied(ctx, env.Client, provisioner, machine, node) - ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, consistencyController, client.ObjectKeyFromObject(machine)) Expect(recorder.DetectedEvent("expected 128Gi of resource memory, but found 64Gi (50.0% of expected)")).To(BeTrue()) }) }) diff --git a/pkg/controllers/consistency/termination.go b/pkg/controllers/consistency/termination.go index 8ffec98a45..b15313301e 100644 --- a/pkg/controllers/consistency/termination.go +++ b/pkg/controllers/consistency/termination.go @@ -21,6 +21,7 @@ import ( v1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/controllers/deprovisioning" nodeutils "github.com/aws/karpenter-core/pkg/utils/node" ) @@ -36,9 +37,9 @@ func NewTermination(kubeClient client.Client) Check { } } -func (t *Termination) Check(ctx context.Context, node *v1.Node) ([]Issue, error) { +func (t *Termination) Check(ctx context.Context, node *v1.Node, machine *v1alpha5.Machine) ([]Issue, error) { // we are only looking at nodes that are hung deleting - if node.DeletionTimestamp.IsZero() { + if machine.DeletionTimestamp.IsZero() { return nil, nil } pdbs, err := deprovisioning.NewPDBLimits(ctx, t.kubeClient) diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 09a72c38ca..df8ca51904 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -25,6 +25,7 @@ import ( "github.com/aws/karpenter-core/pkg/controllers/consistency" "github.com/aws/karpenter-core/pkg/controllers/counter" "github.com/aws/karpenter-core/pkg/controllers/deprovisioning" + "github.com/aws/karpenter-core/pkg/controllers/machine" metricspod "github.com/aws/karpenter-core/pkg/controllers/metrics/pod" metricsprovisioner "github.com/aws/karpenter-core/pkg/controllers/metrics/provisioner" metricsstate "github.com/aws/karpenter-core/pkg/controllers/metrics/state" @@ -55,7 +56,7 @@ func NewControllers( provisioner := provisioning.NewProvisioner(kubeClient, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster) terminator := terminator.NewTerminator(clock, kubeClient, terminator.NewEvictionQueue(ctx, kubernetesInterface.CoreV1(), recorder)) - return []controller.Controller{ + controllers := []controller.Controller{ provisioner, metricsstate.NewController(cluster), deprovisioning.NewController(clock, kubeClient, provisioner, cloudProvider, recorder, cluster), @@ -64,6 +65,7 @@ func NewControllers( informer.NewNodeController(kubeClient, cluster), informer.NewPodController(kubeClient, cluster), informer.NewProvisionerController(kubeClient, cluster), + informer.NewMachineController(kubeClient, cluster), node.NewController(clock, kubeClient, cloudProvider, cluster), termination.NewController(kubeClient, cloudProvider, terminator, recorder), metricspod.NewController(kubeClient), @@ -71,4 +73,6 @@ func NewControllers( counter.NewController(kubeClient, cluster), consistency.NewController(clock, kubeClient, recorder, cloudProvider), } + controllers = append(controllers, machine.NewControllers(clock, kubeClient, cloudProvider)...) + return controllers } diff --git a/pkg/controllers/deprovisioning/consolidation.go b/pkg/controllers/deprovisioning/consolidation.go index ecd2e0e1b0..15fcb64257 100644 --- a/pkg/controllers/deprovisioning/consolidation.go +++ b/pkg/controllers/deprovisioning/consolidation.go @@ -86,15 +86,15 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []*Ca // ShouldDeprovision is a predicate used to filter deprovisionable nodes func (c *consolidation) ShouldDeprovision(_ context.Context, cn *Candidate) bool { if val, ok := cn.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))...) return val != "true" } if cn.provisioner == nil { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, "provisioner is unknown")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, "provisioner is unknown")...) return false } if cn.provisioner.Spec.Consolidation == nil || !ptr.BoolValue(cn.provisioner.Spec.Consolidation.Enabled) { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(cn.Node, cn.Machine, fmt.Sprintf("provisioner %s has consolidation disabled", cn.provisioner.Name))...) return false } return true @@ -119,7 +119,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... if !allPodsScheduled { // This method is used by multi-node consolidation as well, so we'll only report in the single node case if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "not all pods would schedule")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "not all pods would schedule")...) } return Command{action: actionDoNothing}, nil } @@ -135,7 +135,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... // we're not going to turn a single node into multiple candidates if len(newMachines) != 1 { if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...) } return Command{action: actionDoNothing}, nil } @@ -149,7 +149,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... newMachines[0].InstanceTypeOptions = filterByPrice(newMachines[0].InstanceTypeOptions, newMachines[0].Requirements, nodesPrice) if len(newMachines[0].InstanceTypeOptions) == 0 { if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace with a cheaper node")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace with a cheaper node")...) } // no instance types remain after filtering by price return Command{action: actionDoNothing}, nil @@ -168,7 +168,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ... if allExistingAreSpot && newMachines[0].Requirements.Get(v1alpha5.LabelCapacityType).Has(v1alpha5.CapacityTypeSpot) { if len(candidates) == 1 { - c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace a spot node with a spot node")...) + c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, candidates[0].Machine, "can't replace a spot node with a spot node")...) } return Command{action: actionDoNothing}, nil } diff --git a/pkg/controllers/deprovisioning/controller.go b/pkg/controllers/deprovisioning/controller.go index a94c114866..09794aa557 100644 --- a/pkg/controllers/deprovisioning/controller.go +++ b/pkg/controllers/deprovisioning/controller.go @@ -165,15 +165,15 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman } for _, candidate := range command.candidates { - c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, command.String())...) + c.recorder.Publish(deprovisioningevents.Terminating(candidate.Node, candidate.Machine, command.String())...) - if err := c.kubeClient.Delete(ctx, candidate.Node); err != nil { + if err := c.kubeClient.Delete(ctx, candidate.Machine); err != nil { if errors.IsNotFound(err) { continue } logging.FromContext(ctx).Errorf("terminating machine, %s", err) } else { - metrics.NodesTerminatedCounter.With(prometheus.Labels{ + metrics.MachinesTerminatedCounter.With(prometheus.Labels{ metrics.ReasonLabel: reason, metrics.ProvisionerLabel: candidate.provisioner.Name, }).Inc() @@ -183,7 +183,7 @@ func (c *Controller) executeCommand(ctx context.Context, d Deprovisioner, comman // We wait for nodes to delete to ensure we don't start another round of deprovisioning until this node is fully // deleted. for _, oldCandidate := range command.candidates { - c.waitForDeletion(ctx, oldCandidate.Node) + c.waitForDeletion(ctx, oldCandidate.Machine) } return nil } @@ -233,8 +233,8 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name var once sync.Once pollStart := time.Now() return retry.Do(func() error { - node := &v1.Node{} - if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, node); err != nil { + machine := &v1alpha5.Machine{} + if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: name}, machine); err != nil { // If the machine was deleted after a few seconds (to give the cache time to update), then we assume // that the machine was deleted due to an Insufficient Capacity error if errors.IsNotFound(err) && c.clock.Since(pollStart) > time.Second*5 { @@ -243,12 +243,12 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name return fmt.Errorf("getting machine, %w", err) } once.Do(func() { - c.recorder.Publish(deprovisioningevents.Launching(node, action.String())) + c.recorder.Publish(deprovisioningevents.Launching(machine, action.String())) }) - if _, ok := node.Labels[v1alpha5.LabelNodeInitialized]; !ok { + if !machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() { // make the user aware of why deprovisioning is paused - c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(node)) - return fmt.Errorf("node is not initialized") + c.recorder.Publish(deprovisioningevents.WaitingOnReadiness(machine)) + return fmt.Errorf("machine is not initialized") } return nil }, waitRetryOptions...) @@ -257,21 +257,21 @@ func (c *Controller) waitForReadiness(ctx context.Context, action Command, name // waitForDeletion waits for the specified machine to be removed from the API server. This deletion can take some period // of time if there are PDBs that govern pods on the machine as we need to wait until the node drains before // it's actually deleted. -func (c *Controller) waitForDeletion(ctx context.Context, node *v1.Node) { +func (c *Controller) waitForDeletion(ctx context.Context, machine *v1alpha5.Machine) { if err := retry.Do(func() error { - m := &v1.Node{} - nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(node), m) + m := &v1alpha5.Machine{} + nerr := c.kubeClient.Get(ctx, client.ObjectKeyFromObject(machine), m) // We expect the not machine found error, at which point we know the machine is deleted. if errors.IsNotFound(nerr) { return nil } // make the user aware of why deprovisioning is paused - c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(node)) + c.recorder.Publish(deprovisioningevents.WaitingOnDeletion(machine)) if nerr != nil { - return fmt.Errorf("expected node to be not found, %w", nerr) + return fmt.Errorf("expected machine to be not found, %w", nerr) } // the machine still exists - return fmt.Errorf("expected node to be not found") + return fmt.Errorf("expected machine to be not found") }, waitRetryOptions..., ); err != nil { logging.FromContext(ctx).Errorf("Waiting on machine deletion, %s", err) diff --git a/pkg/controllers/deprovisioning/drift.go b/pkg/controllers/deprovisioning/drift.go index bf52b81a5e..809c0dcc9d 100644 --- a/pkg/controllers/deprovisioning/drift.go +++ b/pkg/controllers/deprovisioning/drift.go @@ -75,7 +75,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, nodes ...*Candidate) (Comman } // Log when all pods can't schedule, as the command will get executed immediately. if !allPodsScheduled { - logging.FromContext(ctx).With("node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods") + logging.FromContext(ctx).With("machine", candidate.Machine.Name, "node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods") } if len(newMachines) == 0 { return Command{ diff --git a/pkg/controllers/deprovisioning/drift_test.go b/pkg/controllers/deprovisioning/drift_test.go index 8d2af628b3..924b5b71c7 100644 --- a/pkg/controllers/deprovisioning/drift_test.go +++ b/pkg/controllers/deprovisioning/drift_test.go @@ -79,8 +79,8 @@ var _ = Describe("Drift", func() { wg.Wait() // Expect to not create or delete more machines - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, machine) }) It("should ignore nodes with the disrupted annotation key, but not the drifted value", func() { node.Annotations = lo.Assign(node.Annotations, map[string]string{ @@ -99,8 +99,8 @@ var _ = Describe("Drift", func() { wg.Wait() // Expect to not create or delete more machines - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, machine) }) It("should ignore nodes without the disrupted annotation key", func() { delete(node.Annotations, v1alpha5.VoluntaryDisruptionAnnotationKey) @@ -114,8 +114,8 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) // Expect to not create or delete more machines - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, machine) }) It("can delete drifted nodes", func() { node.Annotations = lo.Assign(node.Annotations, map[string]string{ @@ -133,9 +133,13 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // We should delete the machine that has drifted + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) }) It("can replace drifted nodes", func() { labels := map[string]string{ @@ -175,15 +179,21 @@ var _ = Describe("Drift", func() { // deprovisioning won't delete the old machine until the new machine is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() - ExpectNotFound(ctx, env.Client, node) + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + + ExpectNotFound(ctx, env.Client, machine, node) // Expect that the new machine was created and its different than the original + machines := ExpectMachines(ctx, env.Client) nodes := ExpectNodes(ctx, env.Client) + Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) + Expect(machines[0].Name).ToNot(Equal(machine.Name)) Expect(nodes[0].Name).ToNot(Equal(node.Name)) }) It("can replace drifted nodes with multiple nodes", func() { @@ -273,12 +283,16 @@ var _ = Describe("Drift", func() { // deprovisioning won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 3) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // expect that drift provisioned three nodes, one for each pod - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) }) It("should delete one drifted node at a time", func() { @@ -328,7 +342,11 @@ var _ = Describe("Drift", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2) + // Expect one of the nodes to be deleted + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) }) }) diff --git a/pkg/controllers/deprovisioning/emptiness.go b/pkg/controllers/deprovisioning/emptiness.go index 1b019706c6..adf8351c2c 100644 --- a/pkg/controllers/deprovisioning/emptiness.go +++ b/pkg/controllers/deprovisioning/emptiness.go @@ -64,7 +64,7 @@ func (e *Emptiness) ShouldDeprovision(ctx context.Context, c *Candidate) bool { // ComputeCommand generates a deprovisioning command given deprovisionable machines func (e *Emptiness) ComputeCommand(_ context.Context, candidates ...*Candidate) (Command, error) { emptyCandidates := lo.Filter(candidates, func(cn *Candidate, _ int) bool { - return cn.Node.DeletionTimestamp.IsZero() && len(cn.pods) == 0 + return cn.Machine.DeletionTimestamp.IsZero() && len(cn.pods) == 0 }) if len(emptyCandidates) == 0 { diff --git a/pkg/controllers/deprovisioning/events/events.go b/pkg/controllers/deprovisioning/events/events.go index 337cd5695c..53cd07a6c4 100644 --- a/pkg/controllers/deprovisioning/events/events.go +++ b/pkg/controllers/deprovisioning/events/events.go @@ -20,10 +20,11 @@ import ( v1 "k8s.io/api/core/v1" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" ) -func Blocked(node *v1.Node, reason string) []events.Event { +func Blocked(node *v1.Node, machine *v1alpha5.Machine, reason string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -32,10 +33,17 @@ func Blocked(node *v1.Node, reason string) []events.Event { Message: fmt.Sprintf("Cannot deprovision node due to %s", reason), DedupeValues: []string{node.Name, reason}, }, + { + InvolvedObject: machine, + Type: v1.EventTypeNormal, + Reason: "DeprovisioningBlocked", + Message: fmt.Sprintf("Cannot deprovision machine due to %s", reason), + DedupeValues: []string{machine.Name, reason}, + }, } } -func Terminating(node *v1.Node, reason string) []events.Event { +func Terminating(node *v1.Node, machine *v1alpha5.Machine, reason string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -44,40 +52,47 @@ func Terminating(node *v1.Node, reason string) []events.Event { Message: fmt.Sprintf("Deprovisioning node via %s", reason), DedupeValues: []string{node.Name, reason}, }, + { + InvolvedObject: machine, + Type: v1.EventTypeNormal, + Reason: "DeprovisioningTerminating", + Message: fmt.Sprintf("Deprovisioning machine via %s", reason), + DedupeValues: []string{machine.Name, reason}, + }, } } -func Launching(node *v1.Node, reason string) events.Event { +func Launching(machine *v1alpha5.Machine, reason string) events.Event { return events.Event{ - InvolvedObject: node, + InvolvedObject: machine, Type: v1.EventTypeNormal, Reason: "DeprovisioningLaunching", - Message: fmt.Sprintf("Launching node for %s", reason), - DedupeValues: []string{node.Name, reason}, + Message: fmt.Sprintf("Launching machine for %s", reason), + DedupeValues: []string{machine.Name, reason}, } } -func WaitingOnReadiness(node *v1.Node) events.Event { +func WaitingOnReadiness(machine *v1alpha5.Machine) events.Event { return events.Event{ - InvolvedObject: node, + InvolvedObject: machine, Type: v1.EventTypeNormal, Reason: "DeprovisioningWaitingReadiness", Message: "Waiting on readiness to continue deprovisioning", - DedupeValues: []string{node.Name}, + DedupeValues: []string{machine.Name}, } } -func WaitingOnDeletion(node *v1.Node) events.Event { +func WaitingOnDeletion(machine *v1alpha5.Machine) events.Event { return events.Event{ - InvolvedObject: node, + InvolvedObject: machine, Type: v1.EventTypeNormal, Reason: "DeprovisioningWaitingDeletion", Message: "Waiting on deletion to continue deprovisioning", - DedupeValues: []string{node.Name}, + DedupeValues: []string{machine.Name}, } } -func Unconsolidatable(node *v1.Node, reason string) []events.Event { +func Unconsolidatable(node *v1.Node, machine *v1alpha5.Machine, reason string) []events.Event { return []events.Event{ { InvolvedObject: node, @@ -87,5 +102,13 @@ func Unconsolidatable(node *v1.Node, reason string) []events.Event { DedupeValues: []string{node.Name}, DedupeTimeout: time.Minute * 15, }, + { + InvolvedObject: machine, + Type: v1.EventTypeNormal, + Reason: "Unconsolidatable", + Message: reason, + DedupeValues: []string{machine.Name}, + DedupeTimeout: time.Minute * 15, + }, } } diff --git a/pkg/controllers/deprovisioning/expiration_test.go b/pkg/controllers/deprovisioning/expiration_test.go index 611e5a3f33..34f2871b93 100644 --- a/pkg/controllers/deprovisioning/expiration_test.go +++ b/pkg/controllers/deprovisioning/expiration_test.go @@ -90,8 +90,9 @@ var _ = Describe("Expiration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) // Expect to not create or delete more machines + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + ExpectExists(ctx, env.Client, machine) }) It("can delete expired nodes", func() { ExpectApplied(ctx, env.Client, machine, node, prov) @@ -104,9 +105,13 @@ var _ = Describe("Expiration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // Expect that the expired machine is gone + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) }) It("should expire one node at a time, starting with most expired", func() { expireProv := test.Provisioner(test.ProvisionerOptions{ @@ -151,9 +156,13 @@ var _ = Describe("Expiration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machineToExpire) + // Expect that one of the expired machines is gone + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, nodeToExpire) + ExpectNotFound(ctx, env.Client, machineToExpire, nodeToExpire) }) It("can replace node for expiration", func() { labels := map[string]string{ @@ -189,14 +198,20 @@ var _ = Describe("Expiration", func() { // deprovisioning won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // Expect that the new machine was created, and it's different than the original - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) + machines := ExpectMachines(ctx, env.Client) nodes := ExpectNodes(ctx, env.Client) + Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) + Expect(machines[0].Name).ToNot(Equal(machine.Name)) Expect(nodes[0].Name).ToNot(Equal(node.Name)) }) It("should uncordon nodes when expiration replacement fails", func() { @@ -234,6 +249,7 @@ var _ = Describe("Expiration", func() { var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) + ExpectNewMachinesDeleted(ctx, env.Client, &wg, 1) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).To(HaveOccurred()) wg.Wait() @@ -326,11 +342,15 @@ var _ = Describe("Expiration", func() { // deprovisioning won't delete the old machine until the new machine is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 3) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) }) }) diff --git a/pkg/controllers/deprovisioning/helpers.go b/pkg/controllers/deprovisioning/helpers.go index 4ec13a88c7..4e3b2caa95 100644 --- a/pkg/controllers/deprovisioning/helpers.go +++ b/pkg/controllers/deprovisioning/helpers.go @@ -48,15 +48,15 @@ func filterCandidates(ctx context.Context, kubeClient client.Client, recorder ev // filter out nodes that can't be terminated nodes = lo.Filter(nodes, func(cn *Candidate, _ int) bool { if !cn.Node.DeletionTimestamp.IsZero() { - recorder.Publish(deprovisioningevents.Blocked(cn.Node, "in the process of deletion")...) + recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, "in the process of deletion")...) return false } if pdb, ok := pdbs.CanEvictPods(cn.pods); !ok { - recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) + recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, fmt.Sprintf("pdb %s prevents pod evictions", pdb))...) return false } if p, ok := hasDoNotEvictPod(cn); ok { - recorder.Publish(deprovisioningevents.Blocked(cn.Node, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) + recorder.Publish(deprovisioningevents.Blocked(cn.Node, cn.Machine, fmt.Sprintf("pod %s/%s has do not evict annotation", p.Namespace, p.Name))...) return false } return true diff --git a/pkg/controllers/deprovisioning/metrics.go b/pkg/controllers/deprovisioning/metrics.go index 554b3c26bf..99e00ba0c1 100644 --- a/pkg/controllers/deprovisioning/metrics.go +++ b/pkg/controllers/deprovisioning/metrics.go @@ -44,8 +44,8 @@ var deprovisioningReplacementNodeInitializedHistogram = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: metrics.Namespace, Subsystem: deprovisioningSubsystem, - Name: "replacement_node_initialized_seconds", - Help: "Amount of time required for a replacement node to become initialized.", + Name: "replacement_machine_initialized_seconds", + Help: "Amount of time required for a replacement machine to become initialized.", Buckets: metrics.DurationBuckets(), }) diff --git a/pkg/controllers/deprovisioning/suite_test.go b/pkg/controllers/deprovisioning/suite_test.go index 6d6ea21185..b119e48f66 100644 --- a/pkg/controllers/deprovisioning/suite_test.go +++ b/pkg/controllers/deprovisioning/suite_test.go @@ -260,21 +260,26 @@ var _ = Describe("Replace Nodes", func() { // consolidation won't delete the old machine until the new machine is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // should create a new machine as there is a cheaper one that can hold the pod + machines := ExpectMachines(ctx, env.Client) nodes := ExpectNodes(ctx, env.Client) + Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) // Expect that the new machine does not request the most expensive instance type - Expect(nodes[0].Name).ToNot(Equal(node.Name)) - Expect(scheduling.NewLabelRequirements(nodes[0].Labels).Has(v1.LabelInstanceTypeStable)).To(BeTrue()) - Expect(scheduling.NewLabelRequirements(nodes[0].Labels).Get(v1.LabelInstanceTypeStable).Has(mostExpensiveInstance.Name)).To(BeFalse()) + Expect(machines[0].Name).ToNot(Equal(machine.Name)) + Expect(scheduling.NewNodeSelectorRequirements(machines[0].Spec.Requirements...).Has(v1.LabelInstanceTypeStable)).To(BeTrue()) + Expect(scheduling.NewNodeSelectorRequirements(machines[0].Spec.Requirements...).Get(v1.LabelInstanceTypeStable).Has(mostExpensiveInstance.Name)).To(BeFalse()) // and delete the old one - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) }) It("can replace nodes, considers PDB", func() { labels := map[string]string{ @@ -350,7 +355,7 @@ var _ = Describe("Replace Nodes", func() { // we didn't create a new machine or delete the old one Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + ExpectExists(ctx, env.Client, machine) }) It("can replace nodes, PDB namespace must match", func() { labels := map[string]string{ @@ -419,13 +424,17 @@ var _ = Describe("Replace Nodes", func() { // consolidation won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + // should create a new machine as there is a cheaper one that can hold the pod + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, node) + ExpectNotFound(ctx, env.Client, machine, node) }) It("can replace nodes, considers do-not-consolidate annotation", func() { labels := map[string]string{ @@ -509,9 +518,13 @@ var _ = Describe("Replace Nodes", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, regularMachine) + // we should delete the non-annotated node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, regularNode) + ExpectNotFound(ctx, env.Client, regularMachine, regularNode) }) It("won't replace node if any spot replacement is more expensive", func() { currentInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ @@ -609,8 +622,9 @@ var _ = Describe("Replace Nodes", func() { Expect(cluster.Consolidated()).To(BeTrue()) // Expect to not create or delete more machines + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + ExpectExists(ctx, env.Client, machine) }) It("won't replace on-demand node if on-demand replacement is more expensive", func() { currentInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ @@ -723,8 +737,9 @@ var _ = Describe("Replace Nodes", func() { Expect(cluster.Consolidated()).To(BeTrue()) // Expect to not create or delete more machines + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node) + ExpectExists(ctx, env.Client, machine) }) It("waits for node deletion to finish", func() { labels := map[string]string{ @@ -753,6 +768,7 @@ var _ = Describe("Replace Nodes", func() { }) machine, node := test.MachineAndNode(v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{"unit-test.com/block-deletion"}, Labels: map[string]string{ v1alpha5.ProvisionerNameLabelKey: prov.Name, v1.LabelInstanceTypeStable: mostExpensiveInstance.Name, @@ -765,7 +781,6 @@ var _ = Describe("Replace Nodes", func() { Allocatable: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("32")}, }, }) - node.Finalizers = []string{"unit-test.com/block-deletion"} ExpectApplied(ctx, env.Client, rs, pod, machine, node, prov) @@ -780,7 +795,7 @@ var _ = Describe("Replace Nodes", func() { // consolidation won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) var consolidationFinished atomic.Bool go func() { @@ -791,25 +806,31 @@ var _ = Describe("Replace Nodes", func() { }() wg.Wait() - // node should still exist - ExpectExists(ctx, env.Client, node) + // machine should still exist + ExpectExists(ctx, env.Client, machine) // and consolidation should still be running waiting on the machine's deletion Expect(consolidationFinished.Load()).To(BeFalse()) - // fetch the latest node object and remove the finalizer - node = ExpectExists(ctx, env.Client, node) - ExpectFinalizersRemoved(ctx, env.Client, node) + // fetch the latest machine object and remove the finalizer + machine = ExpectExists(ctx, env.Client, machine) + ExpectFinalizersRemoved(ctx, env.Client, machine) // consolidation should complete now that the finalizer on the machine is gone and it can // was actually deleted Eventually(consolidationFinished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() - ExpectNotFound(ctx, env.Client, node) + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + + ExpectNotFound(ctx, env.Client, machine, node) // Expect that the new machine was created and its different than the original + machines := ExpectMachines(ctx, env.Client) nodes := ExpectNodes(ctx, env.Client) + Expect(machines).To(HaveLen(1)) Expect(nodes).To(HaveLen(1)) + Expect(machines[0].Name).ToNot(Equal(machine.Name)) Expect(nodes[0].Name).ToNot(Equal(node.Name)) }) }) @@ -894,10 +915,14 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine2) + // we don't need a new node, but we should evict everything off one of node2 which only has a single pod + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // and delete the old one - ExpectNotFound(ctx, env.Client, node2) + ExpectNotFound(ctx, env.Client, machine2, node2) }) It("can delete nodes, considers PDB", func() { var nl v1.NodeList @@ -955,11 +980,15 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // we don't need a new node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // but we expect to delete the machine with more pods (node1) as the pod on machine2 has a PDB preventing // eviction - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) It("can delete nodes, considers do-not-evict", func() { // create our RS, so we can link a pod to it @@ -1002,10 +1031,14 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // we don't need a new node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // but we expect to delete the machine with more pods (machine1) as the pod on machine2 has a do-not-evict annotation - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) It("can delete nodes, evicts pods without an ownerRef", func() { // create our RS so we can link a pod to it @@ -1046,11 +1079,15 @@ var _ = Describe("Delete Node", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine2) + // we don't need a new node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // but we expect to delete the machine with the fewest pods (machine 2) even though the pod has no ownerRefs // and will not be recreated - ExpectNotFound(ctx, env.Client, node2) + ExpectNotFound(ctx, env.Client, machine2, node2) }) }) @@ -1143,10 +1180,14 @@ var _ = Describe("Node Lifetime Consideration", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // the second node has more pods, so it would normally not be picked for consolidation, except it very little // lifetime remaining, so it should be deleted + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) }) @@ -1154,7 +1195,7 @@ var _ = Describe("Topology Consideration", func() { var prov *v1alpha5.Provisioner var zone1Machine, zone2Machine, zone3Machine *v1alpha5.Machine var zone1Node, zone2Node, zone3Node *v1.Node - var oldNodeNames sets.String + var oldMachineNames sets.String BeforeEach(func() { testZone1Instance := leastExpensiveInstanceWithZone("test-zone-1") @@ -1206,7 +1247,7 @@ var _ = Describe("Topology Consideration", func() { Allocatable: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("1")}, }, }) - oldNodeNames = sets.NewString(zone1Node.Name, zone2Node.Name, zone3Node.Name) + oldMachineNames = sets.NewString(zone1Machine.Name, zone2Machine.Name, zone3Machine.Name) }) It("can replace node maintaining zonal topology spread", func() { labels := map[string]string{ @@ -1255,17 +1296,25 @@ var _ = Describe("Topology Consideration", func() { // consolidation won't delete the old node until the new node is ready var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, zone2Machine) + // should create a new node as there is a cheaper one that can hold the pod + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) - ExpectNotFound(ctx, env.Client, zone2Node) + ExpectNotFound(ctx, env.Client, zone2Machine, zone2Node) - // Find the new node + // Find the new node associated with the machine + newMachine, ok := lo.Find(ExpectMachines(ctx, env.Client), func(m *v1alpha5.Machine) bool { + return !oldMachineNames.Has(m.Name) + }) + Expect(ok).To(BeTrue()) newNode, ok := lo.Find(ExpectNodes(ctx, env.Client), func(n *v1.Node) bool { - return !oldNodeNames.Has(n.Name) + return newMachine.Status.ProviderID == n.Spec.ProviderID }) Expect(ok).To(BeTrue()) @@ -1338,10 +1387,11 @@ var _ = Describe("Topology Consideration", func() { // our nodes are already the cheapest available, so we can't replace them. If we delete, it would // violate the anti-affinity rule, so we can't do anything. + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(3)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) - ExpectExists(ctx, env.Client, zone1Node) - ExpectExists(ctx, env.Client, zone2Node) - ExpectExists(ctx, env.Client, zone3Node) + ExpectExists(ctx, env.Client, zone1Machine) + ExpectExists(ctx, env.Client, zone2Machine) + ExpectExists(ctx, env.Client, zone3Machine) }) }) @@ -1400,9 +1450,13 @@ var _ = Describe("Empty Nodes", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // we should delete the empty node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) It("can delete multiple empty nodes with consolidation", func() { ExpectApplied(ctx, env.Client, machine1, node1, machine2, node2, prov) @@ -1415,10 +1469,14 @@ var _ = Describe("Empty Nodes", func() { ExpectTriggerVerifyAction(&wg) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2) + // we should delete the empty nodes + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node1) - ExpectNotFound(ctx, env.Client, node2) + ExpectNotFound(ctx, env.Client, machine1) + ExpectNotFound(ctx, env.Client, machine2) }) It("can delete empty nodes with TTLSecondsAfterEmpty with the emptiness timestamp", func() { prov = test.Provisioner(test.ProvisionerOptions{TTLSecondsAfterEmpty: ptr.Int64(10)}) @@ -1444,9 +1502,13 @@ var _ = Describe("Empty Nodes", func() { ExpectTriggerVerifyAction(&wg) ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // we should delete the empty node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) It("considers pending pods when consolidating", func() { machine1, node1 = test.MachineAndNode(v1alpha5.Machine{ @@ -1499,8 +1561,9 @@ var _ = Describe("Empty Nodes", func() { // we don't need any new nodes and consolidation should notice the huge pending pod that needs the large // node to schedule, which prevents the large expensive node from being replaced + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node1) + ExpectExists(ctx, env.Client, machine1) }) }) @@ -1650,7 +1713,7 @@ var _ = Describe("Consolidation TTL", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, node1) + ExpectExists(ctx, env.Client, machine1) // advance the clock so that the timeout expires fakeClock.Step(31 * time.Second) @@ -1658,9 +1721,13 @@ var _ = Describe("Consolidation TTL", func() { Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // machine should be deleted after the TTL due to emptiness + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) }) It("should wait for the node TTL for non-empty nodes before consolidating", func() { labels := map[string]string{ @@ -1730,8 +1797,8 @@ var _ = Describe("Consolidation TTL", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, node1) - ExpectExists(ctx, env.Client, node2) + ExpectExists(ctx, env.Client, machine1) + ExpectExists(ctx, env.Client, machine2) // advance the clock so that the timeout expires fakeClock.Step(31 * time.Second) @@ -1739,9 +1806,13 @@ var _ = Describe("Consolidation TTL", func() { Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine2) + // machine should be deleted after the TTL due to emptiness + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, node2) + ExpectNotFound(ctx, env.Client, machine2, node2) }) It("should not consolidate if the action becomes invalid during the node TTL wait", func() { pod := test.Pod() @@ -1765,7 +1836,7 @@ var _ = Describe("Consolidation TTL", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, node1) + ExpectExists(ctx, env.Client, machine1) // make the node non-empty by binding it ExpectManualBinding(ctx, env.Client, pod, node1) @@ -1778,8 +1849,9 @@ var _ = Describe("Consolidation TTL", func() { wg.Wait() // nothing should be removed since the node is no longer empty + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, node1) + ExpectExists(ctx, env.Client, machine1) }) }) @@ -1847,7 +1919,7 @@ var _ = Describe("Parallelization", func() { // Run the processing loop in parallel in the background with environment context var wg sync.WaitGroup - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectTriggerVerifyAction(&wg) go func() { defer GinkgoRecover() @@ -1855,11 +1927,12 @@ var _ = Describe("Parallelization", func() { }() wg.Wait() - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(2)) // Add a new pending pod that should schedule while node is not yet deleted pod = test.UnschedulablePod() ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, provisioner, pod) + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(2)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) ExpectScheduled(ctx, env.Client, pod) }) @@ -1903,6 +1976,8 @@ var _ = Describe("Parallelization", func() { ExpectApplied(ctx, env.Client, rs, prov) ExpectProvisionedNoBinding(ctx, env.Client, cluster, cloudProvider, provisioner, lo.Map(pods, func(p *v1.Pod, _ int) *v1.Pod { return p.DeepCopy() })...) + machines := ExpectMachines(ctx, env.Client) + Expect(machines).To(HaveLen(1)) nodes := ExpectNodes(ctx, env.Client) Expect(nodes).To(HaveLen(1)) @@ -2036,13 +2111,17 @@ var _ = Describe("Multi-Node Consolidation", func() { var wg sync.WaitGroup ExpectTriggerVerifyAction(&wg) - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2, machine3) + // three machines should be replaced with a single machine + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectNotFound(ctx, env.Client, node1, node2, node3) + ExpectNotFound(ctx, env.Client, machine1, node1, machine2, node2, machine3, node3) }) It("won't merge 2 nodes into 1 of the same type", func() { labels := map[string]string{ @@ -2103,15 +2182,20 @@ var _ = Describe("Multi-Node Consolidation", func() { ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + // We have [cheap-node, cheap-node] which multi-node consolidation could consolidate via // [delete cheap-node, delete cheap-node, launch cheap-node]. This isn't the best method though // as we should instead just delete one of the nodes instead of deleting both and launching a single // identical replacement. This test verifies the filterOutSameType function from multi-node consolidation // works to ensure we perform the least-disruptive action. + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // should have just deleted the node with the fewest pods - ExpectNotFound(ctx, env.Client, node1) + ExpectNotFound(ctx, env.Client, machine1, node1) // and left the other node alone + ExpectExists(ctx, env.Client, machine2) ExpectExists(ctx, env.Client, node2) }) It("should wait for the node TTL for non-empty nodes before consolidating (multi-node)", func() { @@ -2145,7 +2229,7 @@ var _ = Describe("Multi-Node Consolidation", func() { ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1, node2}, []*v1alpha5.Machine{machine1, machine2}) var wg sync.WaitGroup - ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1) + ExpectMakeNewMachinesReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) wg.Add(1) finished := atomic.Bool{} @@ -2161,8 +2245,8 @@ var _ = Describe("Multi-Node Consolidation", func() { // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) // and the node should not be deleted yet - ExpectExists(ctx, env.Client, node1) - ExpectExists(ctx, env.Client, node2) + ExpectExists(ctx, env.Client, machine1) + ExpectExists(ctx, env.Client, machine2) // advance the clock so that the timeout expires fakeClock.Step(31 * time.Second) @@ -2170,10 +2254,14 @@ var _ = Describe("Multi-Node Consolidation", func() { Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2) + // should launch a single smaller replacement node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) // and delete the two large ones - ExpectNotFound(ctx, env.Client, node1, node2) + ExpectNotFound(ctx, env.Client, machine1, node1, machine2, node2) }) }) @@ -2266,47 +2354,6 @@ func ExpectNewMachinesDeleted(ctx context.Context, c client.Client, wg *sync.Wai }() } -func ExpectMakeNewNodesReady(ctx context.Context, c client.Client, wg *sync.WaitGroup, numNewNodes int) { - existingNodes := ExpectNodes(ctx, c) - existingNodeNames := sets.NewString(lo.Map(existingNodes, func(n *v1.Node, _ int) string { - return n.Name - })...) - - wg.Add(1) - go func() { - nodesMadeReady := 0 - ctx, cancel := context.WithTimeout(ctx, time.Second*10) // give up after 10s - defer GinkgoRecover() - defer wg.Done() - defer cancel() - for { - select { - case <-time.After(50 * time.Millisecond): - nodeList := &v1.NodeList{} - if err := c.List(ctx, nodeList); err != nil { - continue - } - for i := range nodeList.Items { - n := &nodeList.Items[i] - if existingNodeNames.Has(n.Name) { - continue - } - ExpectMakeNodesReady(ctx, c, n) - - nodesMadeReady++ - existingNodeNames.Insert(n.Name) - // did we make all the nodes ready that we expected? - if nodesMadeReady == numNewNodes { - return - } - } - case <-ctx.Done(): - Fail(fmt.Sprintf("waiting for nodes to be ready, %s", ctx.Err())) - } - } - }() -} - func ExpectMakeNewMachinesReady(ctx context.Context, c client.Client, wg *sync.WaitGroup, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, numNewMachines int) { diff --git a/pkg/controllers/deprovisioning/types.go b/pkg/controllers/deprovisioning/types.go index 57ac93f175..f980a2438d 100644 --- a/pkg/controllers/deprovisioning/types.go +++ b/pkg/controllers/deprovisioning/types.go @@ -90,6 +90,9 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, clk clock.Clock if node.Nominated() { return nil, fmt.Errorf("state node is nominated") } + if node.Node == nil || node.Machine == nil { + return nil, fmt.Errorf("state node doesn't contain both a node and a machine") + } pods, err := node.Pods(ctx, kubeClient) if err != nil { diff --git a/pkg/controllers/machine/lifecycle/registration.go b/pkg/controllers/machine/lifecycle/registration.go index 9ea096b798..a4a25ae2cc 100644 --- a/pkg/controllers/machine/lifecycle/registration.go +++ b/pkg/controllers/machine/lifecycle/registration.go @@ -69,6 +69,9 @@ func (r *Registration) Reconcile(ctx context.Context, machine *v1alpha5.Machine) metrics.MachinesRegisteredCounter.With(prometheus.Labels{ metrics.ProvisionerLabel: machine.Labels[v1alpha5.ProvisionerNameLabelKey], }).Inc() + metrics.NodesCreatedCounter.With(prometheus.Labels{ + metrics.ProvisionerLabel: machine.Labels[v1alpha5.ProvisionerNameLabelKey], + }).Inc() return reconcile.Result{}, nil } diff --git a/pkg/controllers/node/controller.go b/pkg/controllers/node/controller.go index 91678683ba..bf74c01f19 100644 --- a/pkg/controllers/node/controller.go +++ b/pkg/controllers/node/controller.go @@ -48,24 +48,21 @@ var _ corecontroller.TypedController[*v1.Node] = (*Controller)(nil) // Controller manages a set of properties on karpenter provisioned nodes, such as // taints, labels, finalizers. type Controller struct { - kubeClient client.Client - cluster *state.Cluster - initialization *Initialization - emptiness *Emptiness - finalizer *Finalizer - drift *Drift - expiration *Expiration + kubeClient client.Client + cluster *state.Cluster + emptiness *Emptiness + drift *Drift + expiration *Expiration } // NewController constructs a nodeController instance func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster) corecontroller.Controller { return corecontroller.Typed[*v1.Node](kubeClient, &Controller{ - kubeClient: kubeClient, - cluster: cluster, - initialization: &Initialization{kubeClient: kubeClient, cloudProvider: cloudProvider}, - emptiness: &Emptiness{kubeClient: kubeClient, clock: clk, cluster: cluster}, - drift: &Drift{kubeClient: kubeClient, cloudProvider: cloudProvider}, - expiration: &Expiration{clock: clk}, + kubeClient: kubeClient, + cluster: cluster, + emptiness: &Emptiness{kubeClient: kubeClient, clock: clk, cluster: cluster}, + drift: &Drift{kubeClient: kubeClient, cloudProvider: cloudProvider}, + expiration: &Expiration{clock: clk}, }) } @@ -82,20 +79,18 @@ func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Re if !node.DeletionTimestamp.IsZero() { return reconcile.Result{}, nil } - provisioner := &v1alpha5.Provisioner{} if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: node.Labels[v1alpha5.ProvisionerNameLabelKey]}, provisioner); err != nil { return reconcile.Result{}, client.IgnoreNotFound(err) } + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provisioner", provisioner.Name)) // Execute Reconcilers var results []reconcile.Result var errs error reconcilers := []nodeReconciler{ - c.initialization, c.emptiness, - c.finalizer, c.expiration, c.drift, } diff --git a/pkg/controllers/node/drift.go b/pkg/controllers/node/drift.go index 7ed774ae60..1b954e3eeb 100644 --- a/pkg/controllers/node/drift.go +++ b/pkg/controllers/node/drift.go @@ -19,13 +19,12 @@ import ( "fmt" "time" + "github.com/samber/lo" v1 "k8s.io/api/core/v1" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/samber/lo" - "github.com/aws/karpenter-core/pkg/apis/settings" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" diff --git a/pkg/controllers/node/emptiness.go b/pkg/controllers/node/emptiness.go index fbbc363e65..eeb3656dae 100644 --- a/pkg/controllers/node/emptiness.go +++ b/pkg/controllers/node/emptiness.go @@ -46,22 +46,18 @@ func (r *Emptiness) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisi if provisioner.Spec.TTLSecondsAfterEmpty == nil { return reconcile.Result{}, nil } - // node is not ready yet, so we don't consider it to possibly be empty if n.Labels[v1alpha5.LabelNodeInitialized] != "true" { return reconcile.Result{}, nil } - empty, err := r.isEmpty(ctx, n) if err != nil { return reconcile.Result{}, err } - // node is empty, but it is in-use per the last scheduling round so we don't consider it empty if r.cluster.IsNodeNominated(n.Name) { return reconcile.Result{}, nil } - _, hasEmptinessTimestamp := n.Annotations[v1alpha5.EmptinessTimestampAnnotationKey] if !empty && hasEmptinessTimestamp { delete(n.Annotations, v1alpha5.EmptinessTimestampAnnotationKey) @@ -72,7 +68,6 @@ func (r *Emptiness) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisi }) logging.FromContext(ctx).Infof("added TTL to empty node") } - // Short requeue result so that we requeue to check for emptiness when the node nomination time ends return reconcile.Result{RequeueAfter: time.Minute}, nil } diff --git a/pkg/controllers/node/finalizer.go b/pkg/controllers/node/finalizer.go deleted file mode 100644 index 2a160553b1..0000000000 --- a/pkg/controllers/node/finalizer.go +++ /dev/null @@ -1,49 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package node - -import ( - "context" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/pkg/ptr" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/aws/karpenter-core/pkg/apis/v1alpha5" -) - -// Finalizer is a subreconciler that ensures nodes have the termination -// finalizer. This protects against instances that launch when Karpenter fails -// to create the node object. In this case, the node will come online without -// the termination finalizer. This controller will update the node accordingly. -type Finalizer struct{} - -// Reconcile reconciles the node -func (r *Finalizer) Reconcile(_ context.Context, provisioner *v1alpha5.Provisioner, node *v1.Node) (reconcile.Result, error) { - if !node.DeletionTimestamp.IsZero() { - return reconcile.Result{}, nil - } - node.OwnerReferences = []metav1.OwnerReference{{ - APIVersion: v1alpha5.SchemeGroupVersion.String(), - Kind: "Provisioner", - Name: provisioner.Name, - UID: provisioner.UID, - BlockOwnerDeletion: ptr.Bool(true), - }} - controllerutil.AddFinalizer(node, v1alpha5.TerminationFinalizer) - return reconcile.Result{}, nil -} diff --git a/pkg/controllers/node/initialization.go b/pkg/controllers/node/initialization.go deleted file mode 100644 index 37da64b16a..0000000000 --- a/pkg/controllers/node/initialization.go +++ /dev/null @@ -1,125 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package node - -import ( - "context" - "fmt" - - "github.com/samber/lo" - v1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/aws/karpenter-core/pkg/apis/v1alpha5" - - "github.com/aws/karpenter-core/pkg/cloudprovider" - "github.com/aws/karpenter-core/pkg/utils/node" - "github.com/aws/karpenter-core/pkg/utils/resources" -) - -type Initialization struct { - kubeClient client.Client - cloudProvider cloudprovider.CloudProvider -} - -// Reconcile reconciles the node -func (r *Initialization) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisioner, n *v1.Node) (reconcile.Result, error) { - // node has been previously determined to be ready, so there's nothing to do - if n.Labels[v1alpha5.LabelNodeInitialized] == "true" { - return reconcile.Result{}, nil - } - - // node is not ready per the label, we need to check if kubelet indicates that the node is ready as well as if - // startup taints are removed and extended resources have been initialized - instanceType, err := r.getInstanceType(ctx, provisioner, n.Labels[v1.LabelInstanceTypeStable]) - if err != nil { - return reconcile.Result{}, fmt.Errorf("determining instance type, %w", err) - } - if !r.isInitialized(n, provisioner, instanceType) { - return reconcile.Result{}, nil - } - - n.Labels[v1alpha5.LabelNodeInitialized] = "true" - return reconcile.Result{}, nil -} - -func (r *Initialization) getInstanceType(ctx context.Context, provisioner *v1alpha5.Provisioner, instanceTypeName string) (*cloudprovider.InstanceType, error) { - instanceTypes, err := r.cloudProvider.GetInstanceTypes(ctx, provisioner) - if err != nil { - return nil, err - } - // The instance type may not be found which can occur if the instance type label was removed/edited. This shouldn't occur, - // but if it does we only lose the ability to check for extended resources. - return lo.FindOrElse(instanceTypes, nil, func(it *cloudprovider.InstanceType) bool { return it.Name == instanceTypeName }), nil -} - -// isInitialized returns true if the node has: -// a) its current status is set to Ready -// b) all the startup taints have been removed from the node -// c) all extended resources have been registered -// This method handles both nil provisioners and nodes without extended resources gracefully. -func (r *Initialization) isInitialized(n *v1.Node, provisioner *v1alpha5.Provisioner, instanceType *cloudprovider.InstanceType) bool { - // fast checks first - if node.GetCondition(n, v1.NodeReady).Status != v1.ConditionTrue { - return false - } - if _, ok := IsStartupTaintRemoved(n, provisioner); !ok { - return false - } - - if _, ok := IsExtendedResourceRegistered(n, instanceType); !ok { - return false - } - return true -} - -// IsStartupTaintRemoved returns true if there are no startup taints registered for the provisioner, or if all startup -// taints have been removed from the node -func IsStartupTaintRemoved(node *v1.Node, provisioner *v1alpha5.Provisioner) (*v1.Taint, bool) { - if provisioner != nil { - for _, startupTaint := range provisioner.Spec.StartupTaints { - for i := 0; i < len(node.Spec.Taints); i++ { - // if the node still has a startup taint applied, it's not ready - if startupTaint.MatchTaint(&node.Spec.Taints[i]) { - return &node.Spec.Taints[i], false - } - } - } - } - return nil, true -} - -// IsExtendedResourceRegistered returns true if there are no extended resources on the node, or they have all been -// registered by device plugins -func IsExtendedResourceRegistered(node *v1.Node, instanceType *cloudprovider.InstanceType) (v1.ResourceName, bool) { - if instanceType == nil { - // no way to know, so assume they're registered - return "", true - } - for resourceName, quantity := range instanceType.Capacity { - if quantity.IsZero() { - continue - } - // kubelet will zero out both the capacity and allocatable for an extended resource on startup, so if our - // annotation says the resource should be there, but it's zero'd in both then the device plugin hasn't - // registered it yet. - // We wait on allocatable since this is the value that is used in scheduling - if resources.IsZero(node.Status.Allocatable[resourceName]) { - return resourceName, false - } - } - return "", true -} diff --git a/pkg/controllers/node/suite_test.go b/pkg/controllers/node/suite_test.go index 460802bf42..e72ae8e868 100644 --- a/pkg/controllers/node/suite_test.go +++ b/pkg/controllers/node/suite_test.go @@ -20,7 +20,6 @@ import ( "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clock "k8s.io/utils/clock/testing" @@ -121,15 +120,19 @@ var _ = Describe("Controller", func() { }) It("should annotate the node when it has drifted in the cloud provider", func() { cp.Drifted = true - node := test.Node(test.NodeOptions{ + machine, node := test.MachineAndNode(v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ v1alpha5.ProvisionerNameLabelKey: provisioner.Name, v1.LabelInstanceTypeStable: test.RandomName(), }, }, + Status: v1alpha5.MachineStatus{ + ProviderID: test.RandomProviderID(), + }, }) - ExpectApplied(ctx, env.Client, provisioner, node) + ExpectApplied(ctx, env.Client, provisioner, machine, node) + ExpectMakeMachinesReady(ctx, env.Client, machine) ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) node = ExpectNodeExists(ctx, env.Client, node.Name) Expect(node.Annotations).To(HaveKeyWithValue(v1alpha5.VoluntaryDisruptionAnnotationKey, v1alpha5.VoluntaryDisruptionDriftedAnnotationValue)) @@ -152,206 +155,8 @@ var _ = Describe("Controller", func() { Expect(node.Annotations).ToNot(HaveKey(v1alpha5.VoluntaryDisruptionAnnotationKey)) }) }) - - Context("Initialization", func() { - It("should initialize the node when ready", func() { - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - }, - }, - ReadyStatus: v1.ConditionTrue, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).To(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should not initialize the node when not ready", func() { - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - }, - }, - ReadyStatus: v1.ConditionFalse, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should initialize the node when extended resources are registered", func() { - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - v1.LabelInstanceTypeStable: "gpu-vendor-instance-type", - }, - }, - ReadyStatus: v1.ConditionTrue, - Allocatable: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - fake.ResourceGPUVendorA: resource.MustParse("2"), - }, - Capacity: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - fake.ResourceGPUVendorA: resource.MustParse("2"), - }, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).To(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should not initialize the node when extended resource isn't registered", func() { - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - v1.LabelInstanceTypeStable: "gpu-vendor-instance-type", - }, - }, - ReadyStatus: v1.ConditionTrue, - Allocatable: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - }, - Capacity: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - }, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should not initialize the node when capacity is filled but allocatable isn't set", func() { - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - v1.LabelInstanceTypeStable: "gpu-vendor-instance-type", - }, - }, - ReadyStatus: v1.ConditionTrue, - Allocatable: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - }, - Capacity: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("4"), - v1.ResourceMemory: resource.MustParse("4Gi"), - v1.ResourcePods: resource.MustParse("5"), - fake.ResourceGPUVendorA: resource.MustParse("2"), - }, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should initialize the node when startup taints are removed", func() { - provisioner.Spec.StartupTaints = []v1.Taint{ - { - Key: "example.com/startup-taint1", - Value: "true", - Effect: v1.TaintEffectNoExecute, - }, - { - Key: "example.com/startup-taint1", - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - { - Key: "example.com/startup-taint2", - Value: "true", - Effect: v1.TaintEffectNoExecute, - }, - } - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - }, - }, - ReadyStatus: v1.ConditionTrue, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).To(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - It("should not initialize the node when startup taints aren't removed", func() { - provisioner.Spec.StartupTaints = []v1.Taint{ - { - Key: "example.com/startup-taint1", - Value: "true", - Effect: v1.TaintEffectNoExecute, - }, - { - Key: "example.com/startup-taint1", - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - { - Key: "example.com/startup-taint2", - Value: "true", - Effect: v1.TaintEffectNoExecute, - }, - } - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: provisioner.Name, - }, - }, - Taints: []v1.Taint{ - { - Key: "example.com/startup-taint1", - Value: "true", - Effect: v1.TaintEffectNoExecute, - }, - }, - ReadyStatus: v1.ConditionTrue, - }) - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Labels).ToNot(HaveKey(v1alpha5.LabelNodeInitialized)) - }) - }) Context("Emptiness", func() { - It("should not TTL nodes that have ready status unknown", func() { - provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) - node := test.Node(test.NodeOptions{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, - ReadyStatus: v1.ConditionUnknown, - }) - - ExpectApplied(ctx, env.Client, provisioner, node) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) - - node = ExpectNodeExists(ctx, env.Client, node.Name) - Expect(node.Annotations).ToNot(HaveKey(v1alpha5.EmptinessTimestampAnnotationKey)) - }) - It("should not TTL nodes that have ready status false", func() { + It("should not TTL nodes that are not initialized", func() { provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) node := test.Node(test.NodeOptions{ ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}}, @@ -367,7 +172,10 @@ var _ = Describe("Controller", func() { It("should label nodes as underutilized and add TTL", func() { provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) node := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1alpha5.LabelNodeInitialized: "true", + }, }}) ExpectApplied(ctx, env.Client, provisioner, node) @@ -385,7 +193,10 @@ var _ = Describe("Controller", func() { It("should remove labels from non-empty nodes", func() { provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30) node := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: provisioner.Name, + v1alpha5.LabelNodeInitialized: "true", + }, Annotations: map[string]string{ v1alpha5.EmptinessTimestampAnnotationKey: fakeClock.Now().Add(100 * time.Second).Format(time.RFC3339), }}, @@ -446,57 +257,6 @@ var _ = Describe("Controller", func() { Expect(node.Annotations).ToNot(HaveKey(v1alpha5.VoluntaryDisruptionAnnotationKey)) }) }) - Context("Finalizer", func() { - It("should add the termination finalizer if missing", func() { - n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, - Finalizers: []string{"fake.com/finalizer"}, - }}) - ExpectApplied(ctx, env.Client, provisioner, n) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) - - n = ExpectNodeExists(ctx, env.Client, n.Name) - Expect(n.Finalizers).To(ConsistOf(n.Finalizers[0], v1alpha5.TerminationFinalizer)) - }) - It("should do nothing if terminating", func() { - n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, - Finalizers: []string{"fake.com/finalizer"}, - }}) - ExpectApplied(ctx, env.Client, provisioner, n) - Expect(env.Client.Delete(ctx, n)).To(Succeed()) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) - - n = ExpectNodeExists(ctx, env.Client, n.Name) - Expect(n.Finalizers).To(Equal(n.Finalizers)) - }) - It("should do nothing if the termination finalizer already exists", func() { - n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, - Finalizers: []string{v1alpha5.TerminationFinalizer, "fake.com/finalizer"}, - }}) - ExpectApplied(ctx, env.Client, provisioner, n) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) - - n = ExpectNodeExists(ctx, env.Client, n.Name) - Expect(n.Finalizers).To(Equal(n.Finalizers)) - }) - It("should add an owner reference to the node", func() { - n := test.Node(test.NodeOptions{ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}, - }}) - ExpectApplied(ctx, env.Client, provisioner, n) - ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(n)) - n = ExpectNodeExists(ctx, env.Client, n.Name) - Expect(n.OwnerReferences).To(Equal([]metav1.OwnerReference{{ - APIVersion: v1alpha5.SchemeGroupVersion.String(), - Kind: "Provisioner", - Name: provisioner.Name, - UID: provisioner.UID, - BlockOwnerDeletion: ptr.Bool(true), - }})) - }) - }) Context("Filters", func() { BeforeEach(func() { innerCtx, cancel := context.WithCancel(ctx) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 772feb626d..7b940eecfb 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -19,14 +19,11 @@ import ( "fmt" "time" - "github.com/imdario/mergo" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" "go.uber.org/multierr" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -317,6 +314,7 @@ func (p *Provisioner) Schedule(ctx context.Context) ([]*scheduler.Machine, []*sc func (p *Provisioner) Launch(ctx context.Context, m *scheduler.Machine, opts ...functional.Option[LaunchOptions]) (string, error) { ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provisioner", m.Labels[v1alpha5.ProvisionerNameLabelKey])) + // Check limits latest := &v1alpha5.Provisioner{} if err := p.kubeClient.Get(ctx, types.NamespacedName{Name: m.ProvisionerName}, latest); err != nil { @@ -328,56 +326,21 @@ func (p *Provisioner) Launch(ctx context.Context, m *scheduler.Machine, opts ... options := functional.ResolveOptions(opts...) logging.FromContext(ctx).Infof("launching %s", m) - created, err := p.cloudProvider.Create( - logging.WithLogger(ctx, logging.FromContext(ctx).Named("cloudprovider")), - m.ToMachine(latest), - ) - if err != nil { - return "", fmt.Errorf("creating cloud provider instance, %w", err) - } - k8sNode := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: created.Name, - Labels: created.Labels, - }, - Spec: v1.NodeSpec{ - ProviderID: created.Status.ProviderID, - }, - } - ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("node", k8sNode.Name)) - - if err := mergo.Merge(k8sNode, m.ToNode()); err != nil { - return "", fmt.Errorf("merging cloud provider node, %w", err) - } - // ensure we clear out the status - k8sNode.Status = v1.NodeStatus{} - - // Idempotently create a node. In rare cases, nodes can come online and - // self register before the controller is able to register a node object - // with the API server. In the common case, we create the node object - // ourselves to enforce the binding decision and enable images to be pulled - // before the node is fully Ready. - if _, err := p.coreV1Client.Nodes().Create(ctx, k8sNode, metav1.CreateOptions{}); err != nil { - if errors.IsAlreadyExists(err) { - logging.FromContext(ctx).Debugf("node already registered") - } else { - return "", fmt.Errorf("creating node %s, %w", k8sNode.Name, err) - } - } - if err := p.cluster.UpdateNode(ctx, k8sNode); err != nil { - return "", fmt.Errorf("updating cluster state, %w", err) + machine := m.ToMachine(latest) + if err := p.kubeClient.Create(ctx, machine); err != nil { + return "", err } - metrics.NodesCreatedCounter.With(prometheus.Labels{ + p.cluster.NominateNodeForPod(ctx, machine.Name) + metrics.MachinesCreatedCounter.With(prometheus.Labels{ metrics.ReasonLabel: options.Reason, - metrics.ProvisionerLabel: k8sNode.Labels[v1alpha5.ProvisionerNameLabelKey], + metrics.ProvisionerLabel: machine.Labels[v1alpha5.ProvisionerNameLabelKey], }).Inc() - p.cluster.NominateNodeForPod(ctx, k8sNode.Name) - if options.RecordPodNomination { + if functional.ResolveOptions(opts...).RecordPodNomination { for _, pod := range m.Pods { - p.recorder.Publish(schedulingevents.NominatePod(pod, k8sNode)...) + p.recorder.Publish(schedulingevents.NominatePod(pod, nil, machine)...) } } - return k8sNode.Name, nil + return machine.Name, nil } func (p *Provisioner) getDaemonSetPods(ctx context.Context) ([]*v1.Pod, error) { diff --git a/pkg/controllers/provisioning/scheduling/events/events.go b/pkg/controllers/provisioning/scheduling/events/events.go index 35d3120019..90e90011a7 100644 --- a/pkg/controllers/provisioning/scheduling/events/events.go +++ b/pkg/controllers/provisioning/scheduling/events/events.go @@ -20,23 +20,39 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/util/flowcontrol" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" ) // PodNominationRateLimiter is a pointer so it rate-limits across events var PodNominationRateLimiter = flowcontrol.NewTokenBucketRateLimiter(5, 10) -func NominatePod(pod *v1.Pod, node *v1.Node) []events.Event { - return []events.Event{ - { +// PodNominationRateLimiterForMachine is a pointer so it rate-limits across events +var PodNominationRateLimiterForMachine = flowcontrol.NewTokenBucketRateLimiter(5, 10) + +func NominatePod(pod *v1.Pod, node *v1.Node, machine *v1alpha5.Machine) []events.Event { + var evts []events.Event + if node != nil { + evts = append(evts, events.Event{ InvolvedObject: pod, Type: v1.EventTypeNormal, Reason: "Nominated", Message: fmt.Sprintf("Pod should schedule on node: %s", node.Name), DedupeValues: []string{string(pod.UID)}, RateLimiter: PodNominationRateLimiter, - }, + }) + } + if machine != nil { + evts = append(evts, events.Event{ + InvolvedObject: pod, + Type: v1.EventTypeNormal, + Reason: "NominatedMachine", + Message: fmt.Sprintf("Pod should schedule on node associated with machine: %s", machine.Name), + DedupeValues: []string{string(pod.UID)}, + RateLimiter: PodNominationRateLimiterForMachine, + }) } + return evts } func PodFailedToSchedule(pod *v1.Pod, err error) events.Event { diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 539d5e6ede..d051b39151 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -145,7 +145,7 @@ func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod, s.cluster.NominateNodeForPod(ctx, existing.Name()) } for _, pod := range existing.Pods { - s.recorder.Publish(schedulingevents.NominatePod(pod, existing.Node)...) + s.recorder.Publish(schedulingevents.NominatePod(pod, existing.Node, existing.Machine)...) } } diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 84a9b767be..4f5256a5cd 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -1800,12 +1800,15 @@ var _ = Describe("In-Flight Nodes", func() { bindings := ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, initialPod) ExpectScheduled(ctx, env.Client, initialPod) - // delete the node + // delete the node/machine + machine1 := bindings.Get(initialPod).Machine node1 := bindings.Get(initialPod).Node + machine1.Finalizers = nil node1.Finalizers = nil - ExpectApplied(ctx, env.Client, node1) - ExpectDeleted(ctx, env.Client, node1) + ExpectApplied(ctx, env.Client, machine1, node1) + ExpectDeleted(ctx, env.Client, machine1, node1) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) + ExpectReconcileSucceeded(ctx, machineStateController, client.ObjectKeyFromObject(machine1)) secondPod := test.UnschedulablePod(opts) ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, secondPod) @@ -1919,7 +1922,9 @@ var _ = Describe("In-Flight Nodes", func() { bindings := ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, initialPod) ExpectScheduled(ctx, env.Client, initialPod) + machine1 := bindings.Get(initialPod).Machine node1 := bindings.Get(initialPod).Node + machine1.StatusConditions().MarkTrue(v1alpha5.MachineInitialized) // delete the pod so that the node is empty ExpectDeleted(ctx, env.Client, initialPod) @@ -1929,7 +1934,7 @@ var _ = Describe("In-Flight Nodes", func() { Value: "tainted", Effect: v1.TaintEffectNoSchedule, }) - ExpectApplied(ctx, env.Client, node1) + ExpectApplied(ctx, env.Client, machine1, node1) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) secondPod := test.UnschedulablePod() @@ -1983,11 +1988,13 @@ var _ = Describe("In-Flight Nodes", func() { // Mark it initialized which only occurs once the startup taint was removed and re-apply only the startup taint. // We also need to add resource capacity as after initialization we assume that kubelet has recorded them. + + machine1 := bindings.Get(initialPod).Machine node1 := bindings.Get(initialPod).Node - node1.Labels[v1alpha5.LabelNodeInitialized] = "true" + machine1.StatusConditions().MarkTrue(v1alpha5.MachineInitialized) node1.Spec.Taints = []v1.Taint{startupTaint} node1.Status.Capacity = v1.ResourceList{v1.ResourcePods: resource.MustParse("10")} - ExpectApplied(ctx, env.Client, node1) + ExpectApplied(ctx, env.Client, machine1, node1) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index faa309f926..4e45851857 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -80,6 +80,11 @@ func NewCluster(clk clock.Clock, client client.Client, cp cloudprovider.CloudPro // of the cluster is as close to correct as it can be when we begin to perform operations // utilizing the cluster state as our source of truth func (c *Cluster) Synced(ctx context.Context) bool { + machineList := &v1alpha5.MachineList{} + if err := c.kubeClient.List(ctx, machineList); err != nil { + logging.FromContext(ctx).Errorf("checking cluster state sync, %v", err) + return false + } nodeList := &v1.NodeList{} if err := c.kubeClient.List(ctx, nodeList); err != nil { logging.FromContext(ctx).Errorf("checking cluster state sync, %v", err) @@ -90,6 +95,13 @@ func (c *Cluster) Synced(ctx context.Context) bool { c.mu.RUnlock() providerIDs := sets.New[string]() + for _, machine := range machineList.Items { + // If the machine hasn't resolved its provider id, then it hasn't resolved its status + if machine.Status.ProviderID == "" { + return false + } + providerIDs.Insert(machine.Status.ProviderID) + } for _, node := range nodeList.Items { if node.Spec.ProviderID == "" { node.Spec.ProviderID = node.Name diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go index 2aa44fb395..6ae08afe8c 100644 --- a/pkg/controllers/state/suite_test.go +++ b/pkg/controllers/state/suite_test.go @@ -1331,7 +1331,6 @@ var _ = Describe("Cluster State Sync", func() { Expect(cluster.Synced(ctx)).To(BeTrue()) }) It("shouldn't consider the cluster state synced if a machine hasn't resolved its provider id", func() { - Skip("enable this test when cluster state sync relies on machines") // Deploy 1000 machines and sync them all with the cluster for i := 0; i < 1000; i++ { machine := test.Machine(v1alpha5.Machine{ @@ -1349,7 +1348,6 @@ var _ = Describe("Cluster State Sync", func() { Expect(cluster.Synced(ctx)).To(BeFalse()) }) It("shouldn't consider the cluster state synced if a machine isn't tracked", func() { - Skip("enable this test when cluster state sync relies on machines") // Deploy 1000 machines and sync them all with the cluster for i := 0; i < 1000; i++ { machine := test.Machine(v1alpha5.Machine{ diff --git a/pkg/controllers/termination/controller.go b/pkg/controllers/termination/controller.go index 7f6fa608e2..9173ff812b 100644 --- a/pkg/controllers/termination/controller.go +++ b/pkg/controllers/termination/controller.go @@ -19,6 +19,7 @@ import ( "fmt" "time" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -36,6 +37,7 @@ import ( "github.com/aws/karpenter-core/pkg/controllers/termination/terminator" terminatorevents "github.com/aws/karpenter-core/pkg/controllers/termination/terminator/events" "github.com/aws/karpenter-core/pkg/events" + "github.com/aws/karpenter-core/pkg/metrics" corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" machineutil "github.com/aws/karpenter-core/pkg/utils/machine" ) @@ -104,6 +106,9 @@ func (c *Controller) removeFinalizer(ctx context.Context, n *v1.Node) error { if err := c.kubeClient.Patch(ctx, n, client.MergeFrom(stored)); err != nil { return client.IgnoreNotFound(fmt.Errorf("patching node, %w", err)) } + metrics.NodesTerminatedCounter.With(prometheus.Labels{ + metrics.ProvisionerLabel: n.Labels[v1alpha5.ProvisionerNameLabelKey], + }).Inc() logging.FromContext(ctx).Infof("deleted node") } return nil diff --git a/pkg/events/suite_test.go b/pkg/events/suite_test.go index faf2c592a1..318cdda892 100644 --- a/pkg/events/suite_test.go +++ b/pkg/events/suite_test.go @@ -77,12 +77,14 @@ var _ = BeforeEach(func() { internalRecorder = NewInternalRecorder() eventRecorder = events.NewRecorder(internalRecorder) schedulingevents.PodNominationRateLimiter = flowcontrol.NewTokenBucketRateLimiter(5, 10) + schedulingevents.PodNominationRateLimiterForMachine = flowcontrol.NewTokenBucketRateLimiter(5, 10) }) var _ = Describe("Event Creation", func() { It("should create a NominatePod event", func() { - eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())...) - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())[0].Reason)).To(Equal(1)) + eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())...) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[0].Reason)).To(Equal(1)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[1].Reason)).To(Equal(1)) }) It("should create a EvictPod event", func() { eventRecorder.Publish(terminatorevents.EvictPod(PodWithUID())) @@ -133,18 +135,20 @@ var _ = Describe("Dedupe", func() { var _ = Describe("Rate Limiting", func() { It("should only create max-burst when many events are created quickly", func() { for i := 0; i < 100; i++ { - eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())...) + eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())...) } - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())[0].Reason)).To(Equal(10)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[0].Reason)).To(Equal(10)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[1].Reason)).To(Equal(10)) }) It("should allow many events over time due to smoothed rate limiting", func() { for i := 0; i < 3; i++ { for j := 0; j < 5; j++ { - eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())...) + eventRecorder.Publish(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())...) } time.Sleep(time.Second) } - Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID())[0].Reason)).To(Equal(15)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[0].Reason)).To(Equal(15)) + Expect(internalRecorder.Calls(schedulingevents.NominatePod(PodWithUID(), NodeWithUID(), MachineWithUID())[1].Reason)).To(Equal(15)) }) }) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index c02ea946ca..9c51894b50 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -25,71 +25,69 @@ const ( ) var ( - NodesCreatedCounter = prometheus.NewCounterVec( + MachinesCreatedCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: nodeSubsystem, + Subsystem: machineSubsystem, Name: "created", - Help: "Number of nodes created in total by Karpenter. Labeled by reason the node was created and the owning provisioner.", + Help: "Number of machines created in total by Karpenter. Labeled by reason the machine was created and the owning provisioner.", }, []string{ ReasonLabel, ProvisionerLabel, }, ) - NodesTerminatedCounter = prometheus.NewCounterVec( + MachinesTerminatedCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: nodeSubsystem, + Subsystem: machineSubsystem, Name: "terminated", - Help: "Number of nodes terminated in total by Karpenter. Labeled by reason the node was terminated and the owning provisioner.", + Help: "Number of machines terminated in total by Karpenter. Labeled by reason the machine was terminated.", }, []string{ ReasonLabel, ProvisionerLabel, }, ) - MachinesCreatedCounter = prometheus.NewCounterVec( + MachinesRegisteredCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, Subsystem: machineSubsystem, - Name: "created", - Help: "Number of machines created in total by Karpenter. Labeled by reason the machine was created and the owning provisioner.", + Name: "registered", + Help: "Number of machines registered in total by Karpenter. Labeled by the owning provisioner.", }, []string{ - ReasonLabel, ProvisionerLabel, }, ) - MachinesTerminatedCounter = prometheus.NewCounterVec( + MachinesInitializedCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, Subsystem: machineSubsystem, - Name: "terminated", - Help: "Number of machines terminated in total by Karpenter. Labeled by reason the machine was terminated and the owning provisioner.", + Name: "initialized", + Help: "Number of machines initialized in total by Karpenter. Labeled by the owning provisioner.", }, []string{ - ReasonLabel, ProvisionerLabel, }, ) - MachinesRegisteredCounter = prometheus.NewCounterVec( + NodesCreatedCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: machineSubsystem, - Name: "registered", - Help: "Number of machines registered in total by Karpenter. Labeled by the owning provisioner.", + Subsystem: nodeSubsystem, + Name: "created", + Help: "Number of nodes created in total by Karpenter. Labeled by owning provisioner.", }, []string{ ProvisionerLabel, }, ) - MachinesInitializedCounter = prometheus.NewCounterVec( + NodesTerminatedCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: Namespace, - Subsystem: machineSubsystem, - Name: "initialized", - Help: "Number of machines initialized in total by Karpenter. Labeled by the owning provisioner.", + Subsystem: nodeSubsystem, + Name: "terminated", + Help: "Number of nodes terminated in total by Karpenter. Labeled by owning provisioner.", }, []string{ ProvisionerLabel, @@ -98,5 +96,6 @@ var ( ) func MustRegister() { - crmetrics.Registry.MustRegister(NodesCreatedCounter, NodesTerminatedCounter) + crmetrics.Registry.MustRegister(MachinesCreatedCounter, MachinesTerminatedCounter, MachinesRegisteredCounter, + MachinesInitializedCounter, NodesCreatedCounter, NodesTerminatedCounter) } diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 596669524c..40de0b3204 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/aws/karpenter-core/pkg/apis" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/events" corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" "github.com/aws/karpenter-core/pkg/operator/injection" @@ -127,6 +128,12 @@ func NewOperator() (context.Context, *Operator) { lo.Must0(manager.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.nodeName", func(o client.Object) []string { return []string{o.(*v1.Pod).Spec.NodeName} }), "failed to setup pod indexer") + lo.Must0(manager.GetFieldIndexer().IndexField(ctx, &v1.Node{}, "spec.providerID", func(o client.Object) []string { + return []string{o.(*v1.Node).Spec.ProviderID} + }), "failed to setup node provider id indexer") + lo.Must0(manager.GetFieldIndexer().IndexField(ctx, &v1alpha5.Machine{}, "status.providerID", func(o client.Object) []string { + return []string{o.(*v1alpha5.Machine).Status.ProviderID} + }), "failed to setup machine provider id indexer") return ctx, &Operator{ Manager: manager, diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index 2770bb0aa0..a9dc5a32a4 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -236,12 +236,13 @@ func ExpectFinalizersRemovedFromList(ctx context.Context, c client.Client, objec func ExpectFinalizersRemoved(ctx context.Context, c client.Client, objs ...client.Object) { for _, obj := range objs { - ExpectWithOffset(1, c.Get(ctx, client.ObjectKeyFromObject(obj), obj)).To(Succeed()) + ExpectWithOffset(1, client.IgnoreNotFound(c.Get(ctx, client.ObjectKeyFromObject(obj), obj))).To(Succeed()) stored := obj.DeepCopyObject().(client.Object) obj.SetFinalizers([]string{}) ExpectWithOffset(1, client.IgnoreNotFound(c.Patch(ctx, obj, client.MergeFrom(stored)))).To(Succeed()) } } + func ExpectProvisioned(ctx context.Context, c client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, provisioner *provisioning.Provisioner, pods ...*v1.Pod) Bindings { bindings := ExpectProvisionedNoBindingWithOffset(1, ctx, c, cluster, cloudProvider, provisioner, pods...) podKeys := sets.NewString(lo.Map(pods, func(p *v1.Pod, _ int) string { return client.ObjectKeyFromObject(p).String() })...) @@ -259,7 +260,7 @@ func ExpectProvisionedNoBinding(ctx context.Context, c client.Client, cluster *s return ExpectProvisionedNoBindingWithOffset(1, ctx, c, cluster, cloudProvider, provisioner, pods...) } -func ExpectProvisionedNoBindingWithOffset(offset int, ctx context.Context, c client.Client, _ *state.Cluster, _ cloudprovider.CloudProvider, provisioner *provisioning.Provisioner, pods ...*v1.Pod) Bindings { +func ExpectProvisionedNoBindingWithOffset(offset int, ctx context.Context, c client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, provisioner *provisioning.Provisioner, pods ...*v1.Pod) Bindings { // Persist objects for _, pod := range pods { ExpectAppliedWithOffset(offset+1, ctx, c, pod) @@ -273,13 +274,24 @@ func ExpectProvisionedNoBindingWithOffset(offset int, ctx context.Context, c cli if err != nil { return bindings } - for _, pod := range m.Pods { - bindings[pod] = &Binding{Node: ExpectNodeExistsWithOffset(offset+1, ctx, c, name)} + machine := &v1alpha5.Machine{} + ExpectWithOffset(offset+1, c.Get(ctx, types.NamespacedName{Name: name}, machine)).To(Succeed()) + machine, node := ExpectMachineDeployedWithOffset(offset+1, ctx, c, cluster, cloudProvider, machine) + if machine != nil && node != nil { + for _, pod := range m.Pods { + bindings[pod] = &Binding{ + Machine: machine, + Node: node, + } + } } } for _, node := range nodes { for _, pod := range node.Pods { - bindings[pod] = &Binding{Node: node.Node} + bindings[pod] = &Binding{ + Node: node.Node, + Machine: node.Machine, + } } } return bindings @@ -320,6 +332,8 @@ func ExpectMachinesCascadeDeletion(ctx context.Context, c client.Client, machine for _, node := range nodes { if node.Spec.ProviderID == machine.Status.ProviderID { Expect(c.Delete(ctx, node)) + ExpectFinalizersRemoved(ctx, c, node) + ExpectNotFound(ctx, c, node) } } }