Skip to content

Commit

Permalink
Scheduler: Resync reserved periodically to keep state consistent
Browse files Browse the repository at this point in the history
Add resyncReserved removes deleted vPods from reserved to keep the
state consistent when leadership changes (Promote / Demote).

`initReserved` is not enough since the vPod lister can be stale.

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi committed Feb 11, 2025
1 parent 4a6e7d2 commit 4c53339
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.
// VPod represents virtual replicas placed into real Kubernetes pods
// The scheduler is responsible for placing VPods
type VPod interface {
GetDeletionTimestamp() *metav1.Time

// GetKey returns the VPod key (namespace/name).
GetKey() types.NamespacedName

Expand Down
64 changes: 64 additions & 0 deletions pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"sort"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -114,6 +115,11 @@ type StatefulSetScheduler struct {
// replicas is the (cached) number of statefulset replicas.
replicas int32

// isLeader signals whether a given Scheduler instance is leader or not.
// The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a
// bucket where we've been promoted.
isLeader atomic.Bool

// reserved tracks vreplicas that have been placed (ie. scheduled) but haven't been
// committed yet (ie. not appearing in vpodLister)
reserved map[types.NamespacedName]map[string]int32
Expand All @@ -130,6 +136,9 @@ func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler.
if !b.Has(ephemeralLeaderElectionObject) {
return nil
}
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
// Flip the flag after running initReserved.
defer s.isLeader.Store(true)

if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
return v.Promote(b, enq)
Expand All @@ -151,6 +160,9 @@ func (s *StatefulSetScheduler) initReserved() error {

s.reserved = make(map[types.NamespacedName]map[string]int32, len(vPods))
for _, vPod := range vPods {
if !vPod.GetDeletionTimestamp().IsZero() {
continue
}
s.reserved[vPod.GetKey()] = make(map[string]int32, len(vPod.GetPlacements()))
for _, placement := range vPod.GetPlacements() {
s.reserved[vPod.GetKey()][placement.PodName] += placement.VReplicas
Expand All @@ -159,8 +171,41 @@ func (s *StatefulSetScheduler) initReserved() error {
return nil
}

// resyncReserved removes deleted vPods from reserved to keep the state consistent when leadership
// changes (Promote / Demote).
// initReserved is not enough since the vPod lister can be stale.
func (s *StatefulSetScheduler) resyncReserved() error {
if !s.isLeader.Load() {
return nil
}

vPods, err := s.vpodLister()
if err != nil {
return fmt.Errorf("failed to list vPods during reserved resync: %w", err)
}
vPodsByK := vPodsByKey(vPods)

s.reservedMu.Lock()
defer s.reservedMu.Unlock()

for key := range s.reserved {
vPod, ok := vPodsByK[key]
if !ok || vPod == nil {
delete(s.reserved, key)
}
}

return nil
}

// Demote implements reconciler.LeaderAware.
func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) {
if !b.Has(ephemeralLeaderElectionObject) {
return
}
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
defer s.isLeader.Store(false)

if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
v.Demote(b)
}
Expand Down Expand Up @@ -208,6 +253,17 @@ func newStatefulSetScheduler(ctx context.Context,
sif.Shutdown()
}()

go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(cfg.RefreshPeriod * 3):
_ = s.resyncReserved()
}
}
}()

return s
}

Expand Down Expand Up @@ -561,3 +617,11 @@ func upsertPlacements(placements []duckv1alpha1.Placement, placement duckv1alpha
}
return placements
}

func vPodsByKey(vPods []scheduler.VPod) map[types.NamespacedName]scheduler.VPod {
r := make(map[types.NamespacedName]scheduler.VPod, len(vPods))
for _, vPod := range vPods {
r[vPod.GetKey()] = vPod
}
return r
}
4 changes: 4 additions & 0 deletions pkg/scheduler/testing/vpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func NewVPod(ns, name string, vreplicas int32, placements []duckv1alpha1.Placeme
}
}

func (d *sampleVPod) GetDeletionTimestamp() *metav1.Time {
return nil
}

func (d *sampleVPod) GetKey() types.NamespacedName {
return d.key
}
Expand Down

0 comments on commit 4c53339

Please sign in to comment.