Skip to content

Commit

Permalink
fix: Add locking/unlocking to set Add(), Delete(), and Reset() (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis authored Feb 15, 2024
1 parent 3b9758b commit c5ffa75
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 23 deletions.
25 changes: 10 additions & 15 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"testing"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/samber/lo"
clock "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -36,14 +39,13 @@ import (
"sigs.k8s.io/karpenter/pkg/operator/scheme"
"sigs.k8s.io/karpenter/pkg/test"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/ptr"

. "knative.dev/pkg/logging/testing"

. "sigs.k8s.io/karpenter/pkg/test/expectations"
)

Expand Down Expand Up @@ -173,7 +175,7 @@ var _ = Describe("Termination", func() {
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectNotEnqueuedForEviction(queue, podSkip)
Expect(queue.Has(podSkip)).To(BeFalse())
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

// Expect node to exist and be draining
Expand Down Expand Up @@ -201,7 +203,7 @@ var _ = Describe("Termination", func() {
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectNotEnqueuedForEviction(queue, podSkip)
Expect(queue.Has(podSkip)).To(BeFalse())
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

// Expect node to exist and be draining
Expand All @@ -211,7 +213,7 @@ var _ = Describe("Termination", func() {
EventuallyExpectTerminating(ctx, env.Client, podEvict)
ExpectDeleted(ctx, env.Client, podEvict)

ExpectNotEnqueuedForEviction(queue, podSkip)
Expect(queue.Has(podSkip)).To(BeFalse())

// Reconcile to delete node
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expand Down Expand Up @@ -478,7 +480,7 @@ var _ = Describe("Termination", func() {
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

// Expect mirror pod to not be queued for eviction
ExpectNotEnqueuedForEviction(queue, podNoEvict)
Expect(queue.Has(podNoEvict)).To(BeFalse())

// Expect podEvict to be enqueued for eviction then be successful
EventuallyExpectTerminating(ctx, env.Client, podEvict)
Expand Down Expand Up @@ -609,7 +611,7 @@ var _ = Describe("Termination", func() {
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))

// Expect that the old pod's key still exists in the queue
Expect(queue.Has(terminator.NewQueueKey(pod)))
Expect(queue.Has(pod))

// Re-create the pod and node, it should now have the same name, but a different UUID
node = test.Node(test.NodeOptions{
Expand Down Expand Up @@ -659,13 +661,6 @@ var _ = Describe("Termination", func() {
})
})

func ExpectNotEnqueuedForEviction(e *terminator.Queue, pods ...*v1.Pod) {
GinkgoHelper()
for _, pod := range pods {
Expect(e.Has(terminator.NewQueueKey(pod))).To(BeFalse())
}
}

func ExpectNodeWithNodeClaimDraining(c client.Client, nodeName string) *v1.Node {
GinkgoHelper()
node := ExpectNodeExists(ctx, c, nodeName)
Expand Down
30 changes: 24 additions & 6 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/samber/lo"
Expand Down Expand Up @@ -76,7 +77,9 @@ func NewQueueKey(pod *v1.Pod) QueueKey {

type Queue struct {
workqueue.RateLimitingInterface
sets.Set[QueueKey]

mu sync.Mutex
set sets.Set[QueueKey]

kubeClient client.Client
recorder events.Recorder
Expand All @@ -85,7 +88,7 @@ type Queue struct {
func NewQueue(kubeClient client.Client, recorder events.Recorder) *Queue {
queue := &Queue{
RateLimitingInterface: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay)),
Set: sets.New[QueueKey](),
set: sets.New[QueueKey](),
kubeClient: kubeClient,
recorder: recorder,
}
Expand All @@ -102,15 +105,25 @@ func (q *Queue) Builder(_ context.Context, m manager.Manager) controller.Builder

// Add adds pods to the Queue
func (q *Queue) Add(pods ...*v1.Pod) {
q.mu.Lock()
defer q.mu.Unlock()

for _, pod := range pods {
qk := NewQueueKey(pod)
if !q.Set.Has(qk) {
q.Set.Insert(qk)
if !q.set.Has(qk) {
q.set.Insert(qk)
q.RateLimitingInterface.Add(qk)
}
}
}

func (q *Queue) Has(pod *v1.Pod) bool {
q.mu.Lock()
defer q.mu.Unlock()

return q.set.Has(NewQueueKey(pod))
}

func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
// Check if the queue is empty. client-go recommends not using this function to gate the subsequent
// get call, but since we're popping items off the queue synchronously, there should be no synchonization
Expand All @@ -128,7 +141,9 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R
// Evict pod
if q.Evict(ctx, qk) {
q.RateLimitingInterface.Forget(qk)
q.Set.Delete(qk)
q.mu.Lock()
q.set.Delete(qk)
q.mu.Unlock()
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}
// Requeue pod if eviction failed
Expand Down Expand Up @@ -172,6 +187,9 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
}

func (q *Queue) Reset() {
q.mu.Lock()
defer q.mu.Unlock()

q.RateLimitingInterface = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay))
q.Set = sets.New[QueueKey]()
q.set = sets.New[QueueKey]()
}
24 changes: 22 additions & 2 deletions pkg/controllers/node/termination/terminator/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
. "knative.dev/pkg/logging/testing"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "k8s.io/api/core/v1"

"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"

"sigs.k8s.io/karpenter/pkg/apis"
Expand Down Expand Up @@ -127,5 +126,26 @@ var _ = Describe("Eviction/Queue", func() {
ExpectApplied(ctx, env.Client, pdb, pdb2, pod)
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse())
})
It("should ensure that calling Evict() is valid while making Add() calls", func() {
cancelCtx, cancel := context.WithCancel(ctx)
DeferCleanup(func() {
cancel()
})

// Keep calling Reconcile() for the entirety of this test
go func() {
for {
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})
if cancelCtx.Err() != nil {
return
}
}
}()

// Ensure that we add enough pods to the queue while we are pulling items off of the queue (enough to trigger a DATA RACE)
for i := 0; i < 10000; i++ {
queue.Add(test.Pod())
}
})
})
})

0 comments on commit c5ffa75

Please sign in to comment.