Skip to content

Commit

Permalink
fix: Use UUID as a precondition when calling the eviction API (#998)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Feb 9, 2024
1 parent e0dc627 commit f7c5a89
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 58 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.21
require (
github.com/Pallinder/go-randomdata v1.2.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/deckarep/golang-set v1.8.0
github.com/docker/docker v25.0.2+incompatible
github.com/go-logr/logr v1.4.1
github.com/go-logr/zapr v1.3.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4=
github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo=
github.com/docker/docker v25.0.2+incompatible h1:/OaKeauroa10K4Nqavw4zlhcDq/WBcPMc5DbjOGgozY=
github.com/docker/docker v25.0.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
Expand Down
1 change: 0 additions & 1 deletion kwok/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func main() {
WithControllers(ctx, controllers.NewControllers(
op.Clock,
op.GetClient(),
op.KubernetesInterface,
state.NewCluster(op.Clock, op.GetClient(), cloudProvider),
op.EventRecorder,
cloudProvider,
Expand Down
6 changes: 2 additions & 4 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package controllers

import (
"k8s.io/client-go/kubernetes"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -47,14 +46,13 @@ import (
func NewControllers(
clock clock.Clock,
kubeClient client.Client,
kubernetesInterface kubernetes.Interface,
cluster *state.Cluster,
recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider,
) []controller.Controller {

p := provisioning.NewProvisioner(kubeClient, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
evictionQueue := terminator.NewQueue(kubernetesInterface.CoreV1(), recorder)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)

return []controller.Controller{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/orchestration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, env.KubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster)
queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, env.KubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster)
queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue)
})
Expand Down
59 changes: 56 additions & 3 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var _ = BeforeSuite(func() {

cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
queue = terminator.NewQueue(env.KubernetesInterface.CoreV1(), recorder)
queue = terminator.NewQueue(env.Client, recorder)
terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue), recorder)
})

Expand Down Expand Up @@ -321,7 +321,7 @@ var _ = Describe("Termination", func() {

// Expect podNoEvict to fail eviction due to PDB, and be retried
Eventually(func() int {
return queue.NumRequeues(client.ObjectKeyFromObject(podNoEvict))
return queue.NumRequeues(terminator.NewQueueKey(podNoEvict))
}).Should(BeNumerically(">=", 1))

// Delete pod to simulate successful eviction
Expand Down Expand Up @@ -581,6 +581,59 @@ var _ = Describe("Termination", func() {
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectNotFound(ctx, env.Client, node)
})
It("should not evict a new pod with the same name using the old pod's eviction queue key", func() {
pod := test.Pod(test.PodOptions{
NodeName: node.Name,
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
OwnerReferences: defaultOwnerRefs,
},
})
ExpectApplied(ctx, env.Client, node, pod)

// Trigger Termination Controller
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))

// Don't trigger a call into the queue to make sure that we effectively aren't triggering eviction
// We'll use this to try to leave pods in the queue

// Expect node to exist and be draining
ExpectNodeWithNodeClaimDraining(env.Client, node.Name)

// Delete the pod directly to act like something else is doing the pod termination
ExpectDeleted(ctx, env.Client, pod)

// Requeue the termination controller to completely delete the node
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))

// Expect that the old pod's key still exists in the queue
Expect(queue.Has(terminator.NewQueueKey(pod)))

// Re-create the pod and node, it should now have the same name, but a different UUID
node = test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{v1beta1.TerminationFinalizer},
},
})
pod = test.Pod(test.PodOptions{
NodeName: node.Name,
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
OwnerReferences: defaultOwnerRefs,
},
})
ExpectApplied(ctx, env.Client, node, pod)

// Trigger eviction queue with the pod key still in it
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

Consistently(func(g Gomega) {
g.Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(pod), pod)).To(Succeed())
g.Expect(pod.DeletionTimestamp.IsZero()).To(BeTrue())
}, ReconcilerPropagationTime, RequestInterval).Should(Succeed())
})
})
Context("Metrics", func() {
It("should fire the terminationSummary metric when deleting nodes", func() {
Expand Down Expand Up @@ -609,7 +662,7 @@ var _ = Describe("Termination", func() {
func ExpectNotEnqueuedForEviction(e *terminator.Queue, pods ...*v1.Pod) {
GinkgoHelper()
for _, pod := range pods {
Expect(e.Contains(client.ObjectKeyFromObject(pod))).To(BeFalse())
Expect(e.Has(terminator.NewQueueKey(pod))).To(BeFalse())
}
}

Expand Down
81 changes: 52 additions & 29 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"fmt"
"time"

set "github.com/deckarep/golang-set"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -62,19 +62,31 @@ func IsNodeDrainError(err error) bool {
return errors.As(err, &nodeDrainErr)
}

type QueueKey struct {
types.NamespacedName
UID types.UID
}

func NewQueueKey(pod *v1.Pod) QueueKey {
return QueueKey{
NamespacedName: client.ObjectKeyFromObject(pod),
UID: pod.UID,
}
}

type Queue struct {
workqueue.RateLimitingInterface
set.Set
sets.Set[QueueKey]

coreV1Client corev1.CoreV1Interface
recorder events.Recorder
kubeClient client.Client
recorder events.Recorder
}

func NewQueue(coreV1Client corev1.CoreV1Interface, recorder events.Recorder) *Queue {
func NewQueue(kubeClient client.Client, recorder events.Recorder) *Queue {
queue := &Queue{
RateLimitingInterface: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay)),
Set: set.NewSet(),
coreV1Client: coreV1Client,
Set: sets.New[QueueKey](),
kubeClient: kubeClient,
recorder: recorder,
}
return queue
Expand All @@ -91,9 +103,10 @@ func (q *Queue) Builder(_ context.Context, m manager.Manager) controller.Builder
// Add adds pods to the Queue
func (q *Queue) Add(pods ...*v1.Pod) {
for _, pod := range pods {
if nn := client.ObjectKeyFromObject(pod); !q.Set.Contains(nn) {
q.Set.Add(nn)
q.RateLimitingInterface.Add(nn)
qk := NewQueueKey(pod)
if !q.Set.Has(qk) {
q.Set.Insert(qk)
q.RateLimitingInterface.Add(qk)
}
}
}
Expand All @@ -102,53 +115,63 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R
// Check if the queue is empty. client-go recommends not using this function to gate the subsequent
// get call, but since we're popping items off the queue synchronously, there should be no synchonization
// issues.
if q.Len() == 0 {
if q.RateLimitingInterface.Len() == 0 {
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
// Get pod from queue. This waits until queue is non-empty.
item, shutdown := q.RateLimitingInterface.Get()
if shutdown {
return reconcile.Result{}, fmt.Errorf("EvictionQueue is broken and has shutdown")
}
nn := item.(types.NamespacedName)
defer q.RateLimitingInterface.Done(nn)
qk := item.(QueueKey)
defer q.RateLimitingInterface.Done(qk)
// Evict pod
if q.Evict(ctx, nn) {
q.RateLimitingInterface.Forget(nn)
q.Set.Remove(nn)
if q.Evict(ctx, qk) {
q.RateLimitingInterface.Forget(qk)
q.Set.Delete(qk)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}
// Requeue pod if eviction failed
q.RateLimitingInterface.AddRateLimited(nn)
q.RateLimitingInterface.AddRateLimited(qk)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}

// Evict returns true if successful eviction call, and false if not an eviction-related error
func (q *Queue) Evict(ctx context.Context, nn types.NamespacedName) bool {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("pod", nn))
if err := q.coreV1Client.Pods(nn.Namespace).EvictV1(ctx, &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace},
}); err != nil {
func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("pod", key.NamespacedName))
if err := q.kubeClient.SubResource("eviction").Create(ctx,
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}},
&policyv1.Eviction{
DeleteOptions: &metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{
UID: lo.ToPtr(key.UID),
},
},
}); err != nil {
// status codes for the eviction API are defined here:
// https://kubernetes.io/docs/concepts/scheduling-eviction/api-eviction/#how-api-initiated-eviction-works
if apierrors.IsNotFound(err) { // 404
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
// 404 - The pod no longer exists
// https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L160
// 409 - The pod exists, but it is not the same pod that we initiated the eviction on
// https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L318
return true
}
if apierrors.IsTooManyRequests(err) { // 429 - PDB violation
q.recorder.Publish(terminatorevents.NodeFailedToDrain(&v1.Node{ObjectMeta: metav1.ObjectMeta{
Name: nn.Name,
Namespace: nn.Namespace,
}}, fmt.Errorf("evicting pod %s/%s violates a PDB", nn.Namespace, nn.Name)))
Name: key.Name,
Namespace: key.Namespace,
}}, fmt.Errorf("evicting pod %s/%s violates a PDB", key.Namespace, key.Name)))
return false
}
logging.FromContext(ctx).Errorf("evicting pod, %s", err)
return false
}
q.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace}}))
q.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}))
return true
}

func (q *Queue) Reset() {
q.RateLimitingInterface = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay))
q.Set = set.NewSet()
q.Set = sets.New[QueueKey]()
}
21 changes: 13 additions & 8 deletions pkg/controllers/node/termination/terminator/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
"github.com/samber/lo"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
. "knative.dev/pkg/logging/testing"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -57,7 +58,7 @@ var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{Drift: lo.ToPtr(true)}}))
recorder = test.NewEventRecorder()
queue = terminator.NewQueue(env.KubernetesInterface.CoreV1(), recorder)
queue = terminator.NewQueue(env.Client, recorder)
})

var _ = AfterSuite(func() {
Expand Down Expand Up @@ -91,13 +92,17 @@ var _ = Describe("Eviction/Queue", func() {

Context("Eviction API", func() {
It("should succeed with no event when the pod is not found", func() {
ExpectApplied(ctx, env.Client, pdb)
Expect(queue.Evict(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeTrue())
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue())
Expect(recorder.Events()).To(HaveLen(0))
})
It("should succeed with no event when the pod UID conflicts", func() {
ExpectApplied(ctx, env.Client, pod)
Expect(queue.Evict(ctx, terminator.QueueKey{NamespacedName: client.ObjectKeyFromObject(pod), UID: uuid.NewUUID()})).To(BeTrue())
Expect(recorder.Events()).To(HaveLen(0))
})
It("should succeed with an evicted event when there are no PDBs", func() {
ExpectApplied(ctx, env.Client, pod)
Expect(queue.Evict(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeTrue())
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue())
Expect(recorder.Calls("Evicted")).To(Equal(1))
})
It("should succeed with no event when there are PDBs that allow an eviction", func() {
Expand All @@ -106,12 +111,12 @@ var _ = Describe("Eviction/Queue", func() {
MaxUnavailable: &intstr.IntOrString{IntVal: 1},
})
ExpectApplied(ctx, env.Client, pod)
Expect(queue.Evict(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeTrue())
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue())
Expect(recorder.Calls("Evicted")).To(Equal(1))
})
It("should return a NodeDrainError event when a PDB is blocking", func() {
ExpectApplied(ctx, env.Client, pdb, pod)
Expect(queue.Evict(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeFalse())
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse())
Expect(recorder.Calls("FailedDraining")).To(Equal(1))
})
It("should fail when two PDBs refer to the same pod", func() {
Expand All @@ -120,7 +125,7 @@ var _ = Describe("Eviction/Queue", func() {
MaxUnavailable: &intstr.IntOrString{IntVal: 0},
})
ExpectApplied(ctx, env.Client, pdb, pdb2, pod)
Expect(queue.Evict(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace})).To(BeFalse())
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse())
})
})
})
Loading

0 comments on commit f7c5a89

Please sign in to comment.