Skip to content

Commit

Permalink
atc: remove Inprogress status
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmdm committed Nov 24, 2024
1 parent afe748a commit cc3073f
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions cmd/atc/atc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -33,19 +34,20 @@ type ATC struct {
Airway schema.GroupKind
CacheDir string
Concurrency int
Cleanups map[string]func()
Locks *sync.Map
Prev map[string]any

cleanups map[string]func()
locks *sync.Map
prev map[string]any
}

func MakeATC(airway schema.GroupKind, cacheDir string, concurrency int) (ATC, func()) {
atc := ATC{
Airway: airway,
CacheDir: cacheDir,
Concurrency: concurrency,
Cleanups: map[string]func(){},
Locks: &sync.Map{},
Prev: map[string]any{},
cleanups: map[string]func(){},
locks: &sync.Map{},
prev: map[string]any{},
}
return atc, atc.Teardown
}
Expand All @@ -68,19 +70,28 @@ func (atc ATC) Reconcile(ctx context.Context, event ctrl.Event) (result ctrl.Res
return ctrl.Result{}, fmt.Errorf("failed to get airway %s: %w", event.Name, err)
}

if prevSpec := atc.Prev[airway.GetName()]; prevSpec != nil && reflect.DeepEqual(airwaySpec(airway), prevSpec) {
if prevSpec := atc.prev[airway.GetName()]; prevSpec != nil && reflect.DeepEqual(airwaySpec(airway), prevSpec) {
ctrl.Logger(ctx).Info("airway status update: skip reconcile loop")
return ctrl.Result{}, nil
}

defer func() {
if err == nil {
atc.Prev[airway.GetName()] = airwaySpec(airway)
atc.prev[airway.GetName()] = airwaySpec(airway)
}
}()

airwayStatus := func(status, msg string) {
_ = unstructured.SetNestedMap(airway.Object, map[string]any{"Status": status, "Msg": msg}, "status")
prev := airway.Object["status"]
next := map[string]any{"Status": status, "Msg": msg}
if reflect.DeepEqual(prev, next) {
return
}

ctrl.Logger(ctx).Info("updating status", slog.Any("prev", prev), slog.Any("next", next))

_ = unstructured.SetNestedMap(airway.Object, next, "status")

updated, err := airwayIntf.UpdateStatus(ctx, airway.DeepCopy(), metav1.UpdateOptions{FieldManager: "yoke.cd/atc"})
if err != nil {
ctrl.Logger(ctx).Error("failed to update airway status", "error", err)
Expand All @@ -89,8 +100,6 @@ func (atc ATC) Reconcile(ctx context.Context, event ctrl.Event) (result ctrl.Res
airway = updated
}

airwayStatus("InProgress", "Reconciliation started")

defer func() {
if err != nil {
airwayStatus("Error", err.Error())
Expand All @@ -111,7 +120,7 @@ func (atc ATC) Reconcile(ctx context.Context, event ctrl.Event) (result ctrl.Res
}

mutex := func() *sync.RWMutex {
value, _ := atc.Locks.LoadOrStore(airway.GetName(), new(sync.RWMutex))
value, _ := atc.locks.LoadOrStore(airway.GetName(), new(sync.RWMutex))
return value.(*sync.RWMutex)
}()

Expand Down Expand Up @@ -250,15 +259,15 @@ func (atc ATC) Reconcile(ctx context.Context, event ctrl.Event) (result ctrl.Res
return ctrl.Result{}, nil
}

if cleanup := atc.Cleanups[airway.GetName()]; cleanup != nil {
if cleanup := atc.cleanups[airway.GetName()]; cleanup != nil {
cleanup()
}

flightCtx, cancel := context.WithCancel(ctx)

done := make(chan struct{})

atc.Cleanups[airway.GetName()] = func() {
atc.cleanups[airway.GetName()] = func() {
cancel()
ctrl.Logger(ctx).Info("Flight controller canceled. Shutdown in progress.")
<-done
Expand All @@ -284,7 +293,7 @@ func (atc ATC) Reconcile(ctx context.Context, event ctrl.Event) (result ctrl.Res
}

func (atc ATC) Teardown() {
for _, cleanup := range atc.Cleanups {
for _, cleanup := range atc.cleanups {
cleanup()
}
}
Expand Down

0 comments on commit cc3073f

Please sign in to comment.