diff --git a/Makefile b/Makefile index 6b3d519526..5d1e5a154d 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,7 @@ test: ## Run tests go test ./... \ -race \ --ginkgo.focus="${FOCUS}" \ + --ginkgo.v \ -cover -coverprofile=coverage.out -outputdir=. -coverpkg=./... deflake: ## Run randomized, racing tests until the test fails to catch flakes diff --git a/hack/toolchain.sh b/hack/toolchain.sh index 35c0288f5e..285d861be9 100755 --- a/hack/toolchain.sh +++ b/hack/toolchain.sh @@ -20,6 +20,7 @@ tools() { go install github.com/sigstore/cosign/cmd/cosign@v1.10.0 go install github.com/gohugoio/hugo@v0.97.3+extended go install golang.org/x/vuln/cmd/govulncheck@v0.0.0-20220902211423-27dd78d2ca39 + go install github.com/onsi/ginkgo/v2/ginkgo@latest if ! echo "$PATH" | grep -q "${GOPATH:-undefined}/bin\|$HOME/go/bin"; then echo "Go workspace's \"bin\" directory is not in PATH. Run 'export PATH=\"\$PATH:\${GOPATH:-\$HOME/go}/bin\"'." diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index ea0d2a5f73..fdb96f9d22 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -41,10 +41,11 @@ var _ cloudprovider.CloudProvider = (*CloudProvider)(nil) type CloudProvider struct { InstanceTypes []*cloudprovider.InstanceType + mu sync.RWMutex // CreateCalls contains the arguments for every create call that was made since it was cleared - mu sync.RWMutex CreateCalls []*v1alpha5.Machine AllowedCreateCalls int + NextCreateErr error CreatedMachines map[string]*v1alpha5.Machine Drifted bool } @@ -63,12 +64,19 @@ func (c *CloudProvider) Reset() { c.CreateCalls = []*v1alpha5.Machine{} c.CreatedMachines = map[string]*v1alpha5.Machine{} c.AllowedCreateCalls = math.MaxInt + c.NextCreateErr = nil } func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (*v1alpha5.Machine, error) { c.mu.Lock() defer c.mu.Unlock() + if c.NextCreateErr != nil { + temp := c.NextCreateErr + c.NextCreateErr = nil + return nil, temp + } + c.CreateCalls = append(c.CreateCalls, machine) if len(c.CreateCalls) > c.AllowedCreateCalls { return &v1alpha5.Machine{}, fmt.Errorf("erroring as number of AllowedCreateCalls has been exceeded") @@ -188,6 +196,9 @@ func (c *CloudProvider) Delete(_ context.Context, m *v1alpha5.Machine) error { } func (c *CloudProvider) IsMachineDrifted(context.Context, *v1alpha5.Machine) (bool, error) { + c.mu.RLock() + defer c.mu.RUnlock() + return c.Drifted, nil } diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index a93535da69..6508c1c6b9 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -146,17 +146,17 @@ func (ofs Offerings) Cheapest() Offering { // MachineNotFoundError is an error type returned by CloudProviders when the reason for failure is NotFound type MachineNotFoundError struct { - Err error + error } func NewMachineNotFoundError(err error) *MachineNotFoundError { return &MachineNotFoundError{ - Err: err, + error: err, } } func (e *MachineNotFoundError) Error() string { - return fmt.Sprintf("machine not found, %s", e.Err) + return fmt.Sprintf("machine not found, %s", e.error) } func IsMachineNotFoundError(err error) bool { @@ -173,3 +173,33 @@ func IgnoreMachineNotFoundError(err error) error { } return err } + +// InsufficientCapacityError is an error type returned by CloudProviders when a launch fails due to a lack of capacity from machine requirements +type InsufficientCapacityError struct { + error +} + +func NewInsufficientCapacityError(err error) *InsufficientCapacityError { + return &InsufficientCapacityError{ + error: err, + } +} + +func (e *InsufficientCapacityError) Error() string { + return fmt.Sprintf("insufficient capacity, %s", e.error) +} + +func IsInsufficientCapacityError(err error) bool { + if err == nil { + return false + } + var icErr *InsufficientCapacityError + return errors.As(err, &icErr) +} + +func IgnoreInsufficientCapacityError(err error) error { + if IsInsufficientCapacityError(err) { + return nil + } + return err +} diff --git a/pkg/controllers/deprovisioning/consolidation.go b/pkg/controllers/deprovisioning/consolidation.go index ede22e540d..f690683367 100644 --- a/pkg/controllers/deprovisioning/consolidation.go +++ b/pkg/controllers/deprovisioning/consolidation.go @@ -29,8 +29,10 @@ import ( "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" + deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events" "github.com/aws/karpenter-core/pkg/controllers/provisioning" "github.com/aws/karpenter-core/pkg/controllers/state" + "github.com/aws/karpenter-core/pkg/events" "github.com/aws/karpenter-core/pkg/metrics" "github.com/aws/karpenter-core/pkg/scheduling" ) @@ -43,19 +45,19 @@ type consolidation struct { kubeClient client.Client provisioner *provisioning.Provisioner cloudProvider cloudprovider.CloudProvider - reporter *Reporter + recorder events.Recorder lastConsolidationState int64 } func makeConsolidation(clock clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner, - cloudProvider cloudprovider.CloudProvider, reporter *Reporter) consolidation { + cloudProvider cloudprovider.CloudProvider, recorder events.Recorder) consolidation { return consolidation{ clock: clock, cluster: cluster, kubeClient: kubeClient, provisioner: provisioner, cloudProvider: cloudProvider, - reporter: reporter, + recorder: recorder, lastConsolidationState: 0, } } @@ -79,7 +81,7 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []Can // filter out nodes that can't be terminated nodes = lo.Filter(nodes, func(cn CandidateNode, _ int) bool { if reason, canTerminate := canBeTerminated(cn, pdbs); !canTerminate { - c.reporter.RecordUnconsolidatableReason(ctx, cn.Node, reason) + c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(cn.Node, reason)) return false } return true @@ -94,15 +96,15 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []Can // ShouldDeprovision is a predicate used to filter deprovisionable nodes func (c *consolidation) ShouldDeprovision(ctx context.Context, n *state.Node, provisioner *v1alpha5.Provisioner, _ []*v1.Pod) bool { if val, ok := n.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok { - c.reporter.RecordUnconsolidatableReason(ctx, n.Node, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey)) + c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(n.Node, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))) return val != "true" } if provisioner == nil { - c.reporter.RecordUnconsolidatableReason(ctx, n.Node, "provisioner is unknown") + c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(n.Node, "provisioner is unknown")) return false } if provisioner.Spec.Consolidation == nil || !ptr.BoolValue(provisioner.Spec.Consolidation.Enabled) { - c.reporter.RecordUnconsolidatableReason(ctx, n.Node, fmt.Sprintf("provisioner %s has consolidation disabled", provisioner.Name)) + c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(n.Node, fmt.Sprintf("provisioner %s has consolidation disabled", provisioner.Name))) return false } return true @@ -191,7 +193,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, nodes ...Candi if !allPodsScheduled { // This method is used by multi-node consolidation as well, so we'll only report in the single node case if len(nodes) == 1 { - c.reporter.RecordUnconsolidatableReason(ctx, nodes[0].Node, "not all pods would schedule") + c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(nodes[0].Node, "not all pods would schedule")) } return Command{action: actionDoNothing}, nil } @@ -207,7 +209,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, nodes ...Candi // we're not going to turn a single node into multiple nodes if len(newNodes) != 1 { if len(nodes) == 1 { - c.reporter.RecordUnconsolidatableReason(ctx, nodes[0].Node, fmt.Sprintf("can't remove without creating %d nodes", len(newNodes))) + c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(nodes[0].Node, fmt.Sprintf("can't remove without creating %d nodes", len(newNodes)))) } return Command{action: actionDoNothing}, nil } @@ -221,7 +223,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, nodes ...Candi newNodes[0].InstanceTypeOptions = filterByPrice(newNodes[0].InstanceTypeOptions, newNodes[0].Requirements, nodesPrice) if len(newNodes[0].InstanceTypeOptions) == 0 { if len(nodes) == 1 { - c.reporter.RecordUnconsolidatableReason(ctx, nodes[0].Node, "can't replace with a cheaper node") + c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(nodes[0].Node, "can't replace with a cheaper node")) } // no instance types remain after filtering by price return Command{action: actionDoNothing}, nil @@ -240,7 +242,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, nodes ...Candi if allExistingAreSpot && newNodes[0].Requirements.Get(v1alpha5.LabelCapacityType).Has(v1alpha5.CapacityTypeSpot) { if len(nodes) == 1 { - c.reporter.RecordUnconsolidatableReason(ctx, nodes[0].Node, "can't replace a spot node with a spot node") + c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(nodes[0].Node, "can't replace a spot node with a spot node")) } return Command{action: actionDoNothing}, nil } diff --git a/pkg/controllers/deprovisioning/controller.go b/pkg/controllers/deprovisioning/controller.go index 1fbaa36256..3e0dff2cf9 100644 --- a/pkg/controllers/deprovisioning/controller.go +++ b/pkg/controllers/deprovisioning/controller.go @@ -52,7 +52,6 @@ type Controller struct { clock clock.Clock cloudProvider cloudprovider.CloudProvider deprovisioners []Deprovisioner - reporter *Reporter } // pollingPeriod that we inspect cluster to look for opportunities to deprovision @@ -73,14 +72,12 @@ var waitRetryOptions = []retry.Option{ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider, recorder events.Recorder, cluster *state.Cluster) *Controller { - reporter := NewReporter(recorder) return &Controller{ clock: clk, kubeClient: kubeClient, cluster: cluster, provisioner: provisioner, recorder: recorder, - reporter: reporter, cloudProvider: cp, deprovisioners: []Deprovisioner{ // Expire any nodes that must be deleted, allowing their pods to potentially land on currently @@ -90,11 +87,11 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi // Delete any remaining empty nodes as there is zero cost in terms of dirsuption. Emptiness and // emptyNodeConsolidation are mutually exclusive, only one of these will operate NewEmptiness(clk), - NewEmptyNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter), + NewEmptyNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder), // Attempt to identify multiple nodes that we can consolidate simultaneously to reduce pod churn - NewMultiNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter), + NewMultiNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder), // And finally fall back our single node consolidation to further reduce cluster cost. - NewSingleNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter), + NewSingleNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder), }, } } diff --git a/pkg/controllers/deprovisioning/drift_test.go b/pkg/controllers/deprovisioning/drift_test.go index 58553aae4b..435a710060 100644 --- a/pkg/controllers/deprovisioning/drift_test.go +++ b/pkg/controllers/deprovisioning/drift_test.go @@ -15,6 +15,7 @@ limitations under the License. package deprovisioning_test import ( + "sync" "time" . "github.com/onsi/ginkgo/v2" @@ -61,9 +62,11 @@ var _ = Describe("Drift", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() Expect(cloudProvider.CreateCalls).To(HaveLen(0)) ExpectExists(ctx, env.Client, node) @@ -93,9 +96,12 @@ var _ = Describe("Drift", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() Expect(cloudProvider.CreateCalls).To(HaveLen(0)) ExpectExists(ctx, env.Client, node) @@ -154,9 +160,11 @@ var _ = Describe("Drift", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need a new node, but we should evict everything off one of node2 which only has a single pod Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -207,9 +215,11 @@ var _ = Describe("Drift", func() { Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed()) // deprovisioning won't delete the old node until the new node is ready - wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node) + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1, node) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -299,9 +309,11 @@ var _ = Describe("Drift", func() { Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed()) // deprovisioning won't delete the old node until the new node is ready - wg := ExpectMakeNewNodesReady(ctx, env.Client, 3, node) + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3, node) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -344,10 +356,13 @@ var _ = Describe("Drift", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need a new node, but we should evict everything off one of node2 which only has a single pod Expect(cloudProvider.CreateCalls).To(HaveLen(0)) diff --git a/pkg/controllers/deprovisioning/emptynodeconsolidation.go b/pkg/controllers/deprovisioning/emptynodeconsolidation.go index 1246da7b36..8ac757b3d2 100644 --- a/pkg/controllers/deprovisioning/emptynodeconsolidation.go +++ b/pkg/controllers/deprovisioning/emptynodeconsolidation.go @@ -28,6 +28,7 @@ import ( "github.com/aws/karpenter-core/pkg/cloudprovider" "github.com/aws/karpenter-core/pkg/controllers/provisioning" "github.com/aws/karpenter-core/pkg/controllers/state" + "github.com/aws/karpenter-core/pkg/events" ) // EmptyNodeConsolidation is the consolidation controller that performs multi-node consolidation of entirely empty nodes @@ -36,8 +37,8 @@ type EmptyNodeConsolidation struct { } func NewEmptyNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, - provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider, reporter *Reporter) *EmptyNodeConsolidation { - return &EmptyNodeConsolidation{consolidation: makeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter)} + provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider, recorder events.Recorder) *EmptyNodeConsolidation { + return &EmptyNodeConsolidation{consolidation: makeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder)} } // ComputeCommand generates a deprovisioning command given deprovisionable nodes diff --git a/pkg/controllers/deprovisioning/events/events.go b/pkg/controllers/deprovisioning/events/events.go index 78e2b6574d..4150695ba7 100644 --- a/pkg/controllers/deprovisioning/events/events.go +++ b/pkg/controllers/deprovisioning/events/events.go @@ -16,6 +16,7 @@ package events import ( "fmt" + "time" v1 "k8s.io/api/core/v1" @@ -69,5 +70,6 @@ func UnconsolidatableReason(node *v1.Node, reason string) events.Event { Reason: "Unconsolidatable", Message: reason, DedupeValues: []string{node.Name}, + DedupeTimeout: time.Minute * 15, } } diff --git a/pkg/controllers/deprovisioning/expiration_test.go b/pkg/controllers/deprovisioning/expiration_test.go index ec497dd5be..184d985365 100644 --- a/pkg/controllers/deprovisioning/expiration_test.go +++ b/pkg/controllers/deprovisioning/expiration_test.go @@ -15,6 +15,7 @@ limitations under the License. package deprovisioning_test import ( + "sync" "time" . "github.com/onsi/ginkgo/v2" @@ -86,10 +87,14 @@ var _ = Describe("Expiration", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need a new node, but we should evict everything off one of node2 which only has a single pod Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -130,10 +135,13 @@ var _ = Describe("Expiration", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(nodeToExpire)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(nodeNotExpire)) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need a new node, but we should evict everything off one of node2 which only has a single pod Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -183,9 +191,11 @@ var _ = Describe("Expiration", func() { Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed()) // deprovisioning won't delete the old node until the new node is ready - wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node) + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1, node) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -275,9 +285,11 @@ var _ = Describe("Expiration", func() { Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed()) fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).To(HaveOccurred()) + wg.Wait() // Expiration should try to make 3 calls but fail for the third. Expect(cloudProvider.CreateCalls).To(HaveLen(3)) @@ -365,9 +377,11 @@ var _ = Describe("Expiration", func() { Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed()) // deprovisioning won't delete the old node until the new node is ready - wg := ExpectMakeNewNodesReady(ctx, env.Client, 3, node) + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3, node) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() diff --git a/pkg/controllers/deprovisioning/multinodeconsolidation.go b/pkg/controllers/deprovisioning/multinodeconsolidation.go index 7f06c9fadc..55e95086c4 100644 --- a/pkg/controllers/deprovisioning/multinodeconsolidation.go +++ b/pkg/controllers/deprovisioning/multinodeconsolidation.go @@ -27,6 +27,7 @@ import ( "github.com/aws/karpenter-core/pkg/controllers/provisioning" "github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling" "github.com/aws/karpenter-core/pkg/controllers/state" + "github.com/aws/karpenter-core/pkg/events" ) type MultiNodeConsolidation struct { @@ -34,8 +35,8 @@ type MultiNodeConsolidation struct { } func NewMultiNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, - provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider, reporter *Reporter) *MultiNodeConsolidation { - return &MultiNodeConsolidation{makeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter)} + provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider, recorder events.Recorder) *MultiNodeConsolidation { + return &MultiNodeConsolidation{makeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder)} } func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, candidates ...CandidateNode) (Command, error) { diff --git a/pkg/controllers/deprovisioning/reporter.go b/pkg/controllers/deprovisioning/reporter.go deleted file mode 100644 index 584c92bd87..0000000000 --- a/pkg/controllers/deprovisioning/reporter.go +++ /dev/null @@ -1,53 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package deprovisioning - -import ( - "context" - "time" - - v1 "k8s.io/api/core/v1" - - deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events" - "github.com/aws/karpenter-core/pkg/events" - "github.com/aws/karpenter-core/pkg/utils/pretty" -) - -// Reporter is used to periodically report node statuses regarding deprovisioning. This gives observers awareness of why -// deprovisioning of a particular node isn't occurring. -type Reporter struct { - cm *pretty.ChangeMonitor - recorder events.Recorder -} - -func NewReporter(recorder events.Recorder) *Reporter { - // This change monitor is used by the deprovisioning reporter to report why nodes can't be deprovisioned - // periodically. The reporter can be called as often as is convenient and it will prevent these notifications from - // flooding events. - cm := pretty.NewChangeMonitor() - cm.Reconfigure(15 * time.Minute) - - return &Reporter{ - recorder: recorder, - cm: cm, - } -} - -// RecordUnconsolidatableReason is used to periodically report why a node is unconsolidatable to it can be logged -func (r *Reporter) RecordUnconsolidatableReason(ctx context.Context, node *v1.Node, reason string) { - if r.cm.HasChanged(string(node.UID), "consolidation") { - r.recorder.Publish(deprovisioningevents.UnconsolidatableReason(node, reason)) - } -} diff --git a/pkg/controllers/deprovisioning/singlenodeconsolidation.go b/pkg/controllers/deprovisioning/singlenodeconsolidation.go index b7f6d94eec..b42af4a16b 100644 --- a/pkg/controllers/deprovisioning/singlenodeconsolidation.go +++ b/pkg/controllers/deprovisioning/singlenodeconsolidation.go @@ -25,6 +25,7 @@ import ( "github.com/aws/karpenter-core/pkg/cloudprovider" "github.com/aws/karpenter-core/pkg/controllers/provisioning" "github.com/aws/karpenter-core/pkg/controllers/state" + "github.com/aws/karpenter-core/pkg/events" ) // SingleNodeConsolidation is the consolidation controller that performs single node consolidation. @@ -33,8 +34,8 @@ type SingleNodeConsolidation struct { } func NewSingleNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner, - cp cloudprovider.CloudProvider, reporter *Reporter) *SingleNodeConsolidation { - return &SingleNodeConsolidation{consolidation: makeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter)} + cp cloudprovider.CloudProvider, recorder events.Recorder) *SingleNodeConsolidation { + return &SingleNodeConsolidation{consolidation: makeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder)} } // ComputeCommand generates a deprovisioning command given deprovisionable nodes diff --git a/pkg/controllers/deprovisioning/suite_test.go b/pkg/controllers/deprovisioning/suite_test.go index 7c457c7d4d..ab7aba3b32 100644 --- a/pkg/controllers/deprovisioning/suite_test.go +++ b/pkg/controllers/deprovisioning/suite_test.go @@ -59,7 +59,6 @@ var ctx context.Context var env *test.Environment var cluster *state.Cluster var deprovisioningController *deprovisioning.Controller -var provisioningController controller.Controller var provisioner *provisioning.Provisioner var cloudProvider *fake.CloudProvider var nodeStateController controller.Controller @@ -84,23 +83,12 @@ var _ = BeforeSuite(func() { cluster = state.NewCluster(fakeClock, env.Client, cloudProvider) nodeStateController = informer.NewNodeController(env.Client, cluster) provisioner = provisioning.NewProvisioner(ctx, env.Client, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster) - provisioningController = provisioning.NewController(env.Client, provisioner, events.NewRecorder(&record.FakeRecorder{})) }) var _ = AfterSuite(func() { Expect(env.Stop()).To(Succeed(), "Failed to stop environment") }) -func triggerVerifyAction() { - for i := 0; i < 10; i++ { - time.Sleep(250 * time.Millisecond) - if fakeClock.HasWaiters() { - break - } - } - fakeClock.Step(45 * time.Second) -} - var _ = BeforeEach(func() { cloudProvider.CreateCalls = nil cloudProvider.InstanceTypes = fake.InstanceTypesAssorted() @@ -128,6 +116,7 @@ var _ = BeforeEach(func() { } fakeClock.SetTime(time.Now()) deprovisioningController = deprovisioning.NewController(fakeClock, env.Client, provisioner, cloudProvider, events.NewRecorder(&record.FakeRecorder{}), cluster) + // Reset Feature Flags to test defaults ctx = settings.ToContext(ctx, test.Settings(settings.Settings{DriftEnabled: true})) }) @@ -245,9 +234,11 @@ var _ = Describe("Replace Nodes", func() { Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed()) // consolidation won't delete the old node until the new node is ready - wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node) + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1, node) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -391,9 +382,11 @@ var _ = Describe("Replace Nodes", func() { Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed()) // consolidation won't delete the old node until the new node is ready - wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node) + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1, node) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -475,9 +468,12 @@ var _ = Describe("Replace Nodes", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(regularNode)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(annotatedNode)) fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() Expect(cloudProvider.CreateCalls).To(HaveLen(0)) // we should delete the non-annotated node @@ -565,8 +561,11 @@ var _ = Describe("Replace Nodes", func() { Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed()) fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) + wg.Wait() + Expect(cluster.Consolidated()).To(BeTrue()) Expect(err).ToNot(HaveOccurred()) Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -669,8 +668,11 @@ var _ = Describe("Replace Nodes", func() { Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed()) fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) + wg.Wait() + Expect(cluster.Consolidated()).To(BeTrue()) Expect(err).ToNot(HaveOccurred()) Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -720,11 +722,13 @@ var _ = Describe("Replace Nodes", func() { Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed()) // consolidation won't delete the old node until the new node is ready - wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node) + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1, node) + fakeClock.Step(10 * time.Minute) var consolidationFinished atomic.Bool - go triggerVerifyAction() go func() { _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) @@ -739,9 +743,8 @@ var _ = Describe("Replace Nodes", func() { Expect(consolidationFinished.Load()).To(BeFalse()) // fetch the latest node object and remove the finalizer - Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed()) - node.SetFinalizers([]string{}) - Expect(env.Client.Update(ctx, node)).To(Succeed()) + node = ExpectExists(ctx, env.Client, node) + ExpectFinalizersRemoved(ctx, env.Client, node) // consolidation should complete now that the finalizer on the node is gone and it can // was actually deleted @@ -818,10 +821,13 @@ var _ = Describe("Delete Node", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need a new node, but we should evict everything off one of node2 which only has a single pod Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -910,10 +916,14 @@ var _ = Describe("Delete Node", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need a new node Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -988,10 +998,13 @@ var _ = Describe("Delete Node", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need a new node Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -1063,10 +1076,13 @@ var _ = Describe("Delete Node", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need a new node Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -1146,10 +1162,13 @@ var _ = Describe("Node Lifetime Consideration", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + fakeClock.SetTime(time.Now()) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // the second node has more pods so it would normally not be picked for consolidation, except it very little // lifetime remaining so it should be deleted @@ -1243,9 +1262,11 @@ var _ = Describe("Topology Consideration", func() { ExpectSkew(ctx, env.Client, "default", &tsc).To(ConsistOf(1, 1, 1)) // consolidation won't delete the old node until the new node is ready - wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, zone1Node, zone2Node, zone3Node) + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1, zone1Node, zone2Node, zone3Node) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -1345,9 +1366,11 @@ var _ = Describe("Topology Consideration", func() { ExpectScheduled(ctx, env.Client, pods[1]) ExpectScheduled(ctx, env.Client, pods[2]) - wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, zone1Node, zone2Node, zone3Node) + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1, zone1Node, zone2Node, zone3Node) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -1386,9 +1409,12 @@ var _ = Describe("Empty Nodes", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need any new nodes Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -1429,10 +1455,13 @@ var _ = Describe("Empty Nodes", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need any new nodes Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -1464,9 +1493,11 @@ var _ = Describe("Empty Nodes", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node)) fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need any new nodes Expect(cloudProvider.CreateCalls).To(HaveLen(0)) @@ -1513,9 +1544,11 @@ var _ = Describe("Empty Nodes", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // we don't need any new nodes and consolidation should notice the huge pending pod that needs the large // node to schedule, which prevents the large expensive node from being replaced @@ -1673,7 +1706,6 @@ var _ = Describe("consolidation TTL", func() { prov := test.Provisioner(test.ProvisionerOptions{ Consolidation: &v1alpha5.Consolidation{Enabled: ptr.Bool(true)}, }) - node1 := test.Node(test.NodeOptions{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -1777,13 +1809,15 @@ var _ = Describe("Parallelization", func() { fakeClock.Step(10 * time.Minute) // Run the processing loop in parallel in the background with environment context - go triggerVerifyAction() go func() { _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) }() - wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node) + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1, node) + Eventually(func(g Gomega) { // should create a new node as there is a cheaper one that can hold the pod nodes := &v1.NodeList{} @@ -1959,9 +1993,12 @@ var _ = Describe("Multi-Node Consolidation", func() { ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node3)) + + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1, node1, node2, node3) + fakeClock.Step(10 * time.Minute) - wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node1, node2, node3) - go triggerVerifyAction() _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) wg.Wait() @@ -2034,10 +2071,13 @@ var _ = Describe("Multi-Node Consolidation", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) + fakeClock.Step(10 * time.Minute) - go triggerVerifyAction() + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) _, err := deprovisioningController.Reconcile(ctx, reconcile.Request{}) Expect(err).ToNot(HaveOccurred()) + wg.Wait() // We have [cheap-node, cheap-node] which multi-node consolidation could consolidate via // [delete cheap-node, delete cheap-node, launch cheap-node]. This isn't the best method though @@ -2087,7 +2127,6 @@ var _ = Describe("Multi-Node Consolidation", func() { v1.ResourceCPU: resource.MustParse("32"), v1.ResourcePods: resource.MustParse("100"), }}) - node2 := test.Node(test.NodeOptions{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -2114,8 +2153,11 @@ var _ = Describe("Multi-Node Consolidation", func() { // inform cluster state about the nodes ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1)) ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2)) - ExpectMakeNewNodesReady(ctx, env.Client, 1, node1, node2) + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1, node1, node2) + wg.Add(1) finished := atomic.Bool{} go func() { @@ -2180,9 +2222,7 @@ func fromInt(i int) *intstr.IntOrString { return &v } -func ExpectMakeNewNodesReady(ctx context.Context, client client.Client, numNewNodes int, existingNodes ...*v1.Node) *sync.WaitGroup { - var wg sync.WaitGroup - +func ExpectMakeNewNodesReady(ctx context.Context, client client.Client, wg *sync.WaitGroup, numNewNodes int, existingNodes ...*v1.Node) { existingNodeNames := sets.NewString() for _, existing := range existingNodes { existingNodeNames.Insert(existing.Name) @@ -2222,7 +2262,20 @@ func ExpectMakeNewNodesReady(ctx context.Context, client client.Client, numNewNo } } }() - return &wg +} + +func ExpectTriggerVerifyAction(wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + time.Sleep(250 * time.Millisecond) + if fakeClock.HasWaiters() { + break + } + } + fakeClock.Step(45 * time.Second) + }() } func ExpectMakeNodesReady(ctx context.Context, c client.Client, nodes ...*v1.Node) { diff --git a/pkg/controllers/machine/launch.go b/pkg/controllers/machine/launch.go index 7db13a364e..07b9c9574b 100644 --- a/pkg/controllers/machine/launch.go +++ b/pkg/controllers/machine/launch.go @@ -42,6 +42,10 @@ func (l *Launch) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reco logging.FromContext(ctx).Debugf("creating machine") retrieved, err = l.cloudProvider.Create(ctx, machine) if err != nil { + if cloudprovider.IsInsufficientCapacityError(err) { + logging.FromContext(ctx).Error(err) + return reconcile.Result{}, client.IgnoreNotFound(l.kubeClient.Delete(ctx, machine)) + } return reconcile.Result{}, fmt.Errorf("creating machine, %w", err) } } else { diff --git a/pkg/controllers/machine/launch_test.go b/pkg/controllers/machine/launch_test.go index 39eacc9e11..6c6119f50f 100644 --- a/pkg/controllers/machine/launch_test.go +++ b/pkg/controllers/machine/launch_test.go @@ -15,12 +15,15 @@ limitations under the License. package machine_test import ( + "fmt" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" + "github.com/aws/karpenter-core/pkg/cloudprovider" "github.com/aws/karpenter-core/pkg/test" . "github.com/onsi/ginkgo/v2" @@ -112,4 +115,13 @@ var _ = Describe("Launch", func() { machine = ExpectExists(ctx, env.Client, machine) Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineCreated).Status).To(Equal(v1.ConditionTrue)) }) + It("should delete the machine if InsufficientCapacity is returned from the cloudprovider", func() { + cloudProvider.NextCreateErr = cloudprovider.NewInsufficientCapacityError(fmt.Errorf("all instance types were unavailable")) + machine := test.Machine() + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) // Reconcile again to handle termination flow + + ExpectNotFound(ctx, env.Client, machine) + }) }) diff --git a/pkg/controllers/machine/terminator/terminator.go b/pkg/controllers/machine/terminator/terminator.go index e8aa87a7da..02ecd4b4cb 100644 --- a/pkg/controllers/machine/terminator/terminator.go +++ b/pkg/controllers/machine/terminator/terminator.go @@ -111,7 +111,8 @@ func (t *Terminator) TerminateNode(ctx context.Context, node *v1.Node) error { if err := t.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { return err } - terminationSummary.Observe(time.Since(node.DeletionTimestamp.Time).Seconds()) + // We use stored.DeletionTimestamp since the api-server may give back a node after the patch without a deletionTimestamp + terminationSummary.Observe(time.Since(stored.DeletionTimestamp.Time).Seconds()) } return nil } diff --git a/pkg/controllers/termination/controller.go b/pkg/controllers/termination/controller.go index cc5fe75ed9..8b0e35f0f7 100644 --- a/pkg/controllers/termination/controller.go +++ b/pkg/controllers/termination/controller.go @@ -66,17 +66,17 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res return reconcile.Result{}, nil } if err := c.terminator.Cordon(ctx, node); err != nil { - return reconcile.Result{}, fmt.Errorf("cordoning node, %w", err) + return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("cordoning node, %w", err)) } if err := c.terminator.Drain(ctx, node); err != nil { if terminator.IsNodeDrainError(err) { c.recorder.Publish(events.NodeFailedToDrain(node, err)) return reconcile.Result{Requeue: true}, nil } - return reconcile.Result{}, fmt.Errorf("draining node, %w", err) + return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("draining node, %w", err)) } if err := c.terminator.TerminateNode(ctx, node); err != nil { - return reconcile.Result{}, fmt.Errorf("terminating node, %w", err) + return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("terminating node, %w", err)) } return reconcile.Result{}, nil } diff --git a/pkg/events/recorder.go b/pkg/events/recorder.go index 20c25954d7..5cae213625 100644 --- a/pkg/events/recorder.go +++ b/pkg/events/recorder.go @@ -31,6 +31,7 @@ type Event struct { Reason string Message string DedupeValues []string + DedupeTimeout time.Duration RateLimiter flowcontrol.RateLimiter } @@ -50,17 +51,24 @@ type recorder struct { cache *cache.Cache } +const defaultDedupeTimeout = 2 * time.Minute + func NewRecorder(r record.EventRecorder) Recorder { return &recorder{ rec: r, - cache: cache.New(120*time.Second, 10*time.Second), + cache: cache.New(defaultDedupeTimeout, 10*time.Second), } } // Publish creates a Kubernetes event using the passed event struct func (r *recorder) Publish(evt Event) { + // Override the timeout if one is set for an event + timeout := defaultDedupeTimeout + if evt.DedupeTimeout != 0 { + timeout = evt.DedupeTimeout + } // Dedupe same events that involve the same object and are close together - if len(evt.DedupeValues) > 0 && !r.shouldCreateEvent(evt.dedupeKey()) { + if len(evt.DedupeValues) > 0 && !r.shouldCreateEvent(evt.dedupeKey(), timeout) { return } // If the event is rate-limited, then validate we should create the event @@ -70,10 +78,10 @@ func (r *recorder) Publish(evt Event) { r.rec.Event(evt.InvolvedObject, evt.Type, evt.Reason, evt.Message) } -func (r *recorder) shouldCreateEvent(key string) bool { +func (r *recorder) shouldCreateEvent(key string, timeout time.Duration) bool { if _, exists := r.cache.Get(key); exists { return false } - r.cache.SetDefault(key, nil) + r.cache.Set(key, nil, timeout) return true } diff --git a/pkg/events/suite_test.go b/pkg/events/suite_test.go index 77535f92af..dd3f8a74e1 100644 --- a/pkg/events/suite_test.go +++ b/pkg/events/suite_test.go @@ -103,6 +103,22 @@ var _ = Describe("Dedupe", func() { } Expect(internalRecorder.Calls(events.EvictPod(PodWithUID()).Reason)).To(Equal(1)) }) + It("should allow the dedupe timeout to be overridden", func() { + pod := PodWithUID() + evt := events.EvictPod(pod) + evt.DedupeTimeout = time.Second * 2 + + // Generate a set of events within the dedupe timeout + for i := 0; i < 10; i++ { + eventRecorder.Publish(evt) + } + Expect(internalRecorder.Calls(events.EvictPod(PodWithUID()).Reason)).To(Equal(1)) + + // Wait until after the overridden dedupe timeout + time.Sleep(time.Second * 3) + eventRecorder.Publish(evt) + Expect(internalRecorder.Calls(events.EvictPod(PodWithUID()).Reason)).To(Equal(2)) + }) It("should allow events with different entities to be created", func() { for i := 0; i < 100; i++ { eventRecorder.Publish(events.EvictPod(PodWithUID())) diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index ec8998990e..a2a8ed713d 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -175,7 +175,7 @@ func ExpectCleanedUp(ctx context.Context, c client.Client) { wg := sync.WaitGroup{} namespaces := &v1.NamespaceList{} ExpectWithOffset(1, c.List(ctx, namespaces)).To(Succeed()) - ExpectFinalizersRemoved(ctx, c, &v1.NodeList{}, &v1alpha5.MachineList{}, &v1.PersistentVolumeClaimList{}) + ExpectFinalizersRemovedFromList(ctx, c, &v1.NodeList{}, &v1alpha5.MachineList{}, &v1.PersistentVolumeClaimList{}) for _, object := range []client.Object{ &v1.Pod{}, &v1.Node{}, @@ -200,7 +200,7 @@ func ExpectCleanedUp(ctx context.Context, c client.Client) { wg.Wait() } -func ExpectFinalizersRemoved(ctx context.Context, c client.Client, objectLists ...client.ObjectList) { +func ExpectFinalizersRemovedFromList(ctx context.Context, c client.Client, objectLists ...client.ObjectList) { for _, list := range objectLists { ExpectWithOffset(1, c.List(ctx, list)).To(Succeed()) ExpectWithOffset(1, meta.EachListItem(list, func(o runtime.Object) error { @@ -213,6 +213,15 @@ func ExpectFinalizersRemoved(ctx context.Context, c client.Client, objectLists . } } +func ExpectFinalizersRemoved(ctx context.Context, c client.Client, objs ...client.Object) { + for _, obj := range objs { + ExpectWithOffset(1, c.Get(ctx, client.ObjectKeyFromObject(obj), obj)).To(Succeed()) + stored := obj.DeepCopyObject().(client.Object) + obj.SetFinalizers([]string{}) + ExpectWithOffset(1, client.IgnoreNotFound(c.Patch(ctx, obj, client.MergeFrom(stored)))).To(Succeed()) + } +} + func ExpectProvisioned(ctx context.Context, c client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, pods ...*v1.Pod) map[*v1.Pod]*v1.Node { bindings := ExpectProvisionedNoBindingWithOffset(1, ctx, c, provisioner, pods...) podNames := sets.NewString(lo.Map(pods, func(p *v1.Pod, _ int) string { return p.Name })...)