Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: Resync reserved periodically to keep state consistent #8451

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.
// VPod represents virtual replicas placed into real Kubernetes pods
// The scheduler is responsible for placing VPods
type VPod interface {
GetDeletionTimestamp() *metav1.Time

// GetKey returns the VPod key (namespace/name).
GetKey() types.NamespacedName

Expand Down
64 changes: 64 additions & 0 deletions pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"sort"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -114,6 +115,11 @@ type StatefulSetScheduler struct {
// replicas is the (cached) number of statefulset replicas.
replicas int32

// isLeader signals whether a given Scheduler instance is leader or not.
// The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a
// bucket where we've been promoted.
isLeader atomic.Bool

// reserved tracks vreplicas that have been placed (ie. scheduled) but haven't been
// committed yet (ie. not appearing in vpodLister)
reserved map[types.NamespacedName]map[string]int32
Expand All @@ -130,6 +136,9 @@ func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler.
if !b.Has(ephemeralLeaderElectionObject) {
return nil
}
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
// Flip the flag after running initReserved.
defer s.isLeader.Store(true)

if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
return v.Promote(b, enq)
Expand All @@ -151,6 +160,9 @@ func (s *StatefulSetScheduler) initReserved() error {

s.reserved = make(map[types.NamespacedName]map[string]int32, len(vPods))
for _, vPod := range vPods {
if !vPod.GetDeletionTimestamp().IsZero() {
continue
}
s.reserved[vPod.GetKey()] = make(map[string]int32, len(vPod.GetPlacements()))
for _, placement := range vPod.GetPlacements() {
s.reserved[vPod.GetKey()][placement.PodName] += placement.VReplicas
Expand All @@ -159,8 +171,41 @@ func (s *StatefulSetScheduler) initReserved() error {
return nil
}

// resyncReserved removes deleted vPods from reserved to keep the state consistent when leadership
// changes (Promote / Demote).
// initReserved is not enough since the vPod lister can be stale.
func (s *StatefulSetScheduler) resyncReserved() error {
if !s.isLeader.Load() {
return nil
}

vPods, err := s.vpodLister()
if err != nil {
return fmt.Errorf("failed to list vPods during reserved resync: %w", err)
}
vPodsByK := vPodsByKey(vPods)

s.reservedMu.Lock()
defer s.reservedMu.Unlock()

for key := range s.reserved {
vPod, ok := vPodsByK[key]
if !ok || vPod == nil {
delete(s.reserved, key)
}
}

return nil
}

// Demote implements reconciler.LeaderAware.
func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) {
if !b.Has(ephemeralLeaderElectionObject) {
return
}
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
defer s.isLeader.Store(false)

if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
v.Demote(b)
}
Expand Down Expand Up @@ -208,6 +253,17 @@ func newStatefulSetScheduler(ctx context.Context,
sif.Shutdown()
}()

go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(cfg.RefreshPeriod * 3):
_ = s.resyncReserved()
}
}
}()

return s
}

Expand Down Expand Up @@ -561,3 +617,11 @@ func upsertPlacements(placements []duckv1alpha1.Placement, placement duckv1alpha
}
return placements
}

func vPodsByKey(vPods []scheduler.VPod) map[types.NamespacedName]scheduler.VPod {
r := make(map[types.NamespacedName]scheduler.VPod, len(vPods))
for _, vPod := range vPods {
r[vPod.GetKey()] = vPod
}
return r
}
4 changes: 4 additions & 0 deletions pkg/scheduler/testing/vpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func NewVPod(ns, name string, vreplicas int32, placements []duckv1alpha1.Placeme
}
}

func (d *sampleVPod) GetDeletionTimestamp() *metav1.Time {
return nil
}

func (d *sampleVPod) GetKey() types.NamespacedName {
return d.key
}
Expand Down
Loading