Skip to content

Commit

Permalink
atc: add atc teardown
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmdm committed Nov 24, 2024
1 parent b822c2c commit 7033ef6
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/atc-installer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strconv"

"golang.org/x/term"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand Down
37 changes: 23 additions & 14 deletions cmd/atc/atc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
type ATC struct {
Airway schema.GroupKind
Concurrency int
Cleanups *sync.Map
Cleanups map[string]func()
Locks *sync.Map
}

Expand Down Expand Up @@ -204,24 +204,27 @@ func (atc ATC) Reconcile(ctx context.Context, event ctrl.Event) (ctrl.Result, er
return ctrl.Result{}, nil
}

if cleanup := atc.Cleanups[airway.GetName()]; cleanup != nil {
cleanup()
}

flightCtx, cancel := context.WithCancel(ctx)

done := make(chan struct{})

atc.Cleanups[airway.GetName()] = func() {
cancel()
ctrl.Logger(ctx).Info("Flight controller canceled. Shutdown in progress.")
<-done
}

go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

done := make(chan struct{})
defer close(done)

if cleanup, loaded := atc.Cleanups.Swap(airway.GetName(), func() {
cancel()
ctrl.Logger(ctx).Info("waiting for previous flight controller to shutdown")
<-done
}); loaded {
cleanup.(func())()
}

if err := flightController.ProcessGroupKind(ctx, flightGK, flightHander); err != nil {
if err := flightController.ProcessGroupKind(flightCtx, flightGK, flightHander); err != nil {
if errors.Is(err, context.Canceled) {
ctrl.Logger(ctx).Info("flight controller canceled", "groupKind", flightGK.String())
ctrl.Logger(ctx).Info("Flight controller cancled. Shutdown complete.", "groupKind", flightGK.String())
return
}
ctrl.Logger(ctx).Error("could not process group kind", "error", err)
Expand All @@ -230,3 +233,9 @@ func (atc ATC) Reconcile(ctx context.Context, event ctrl.Event) (ctrl.Result, er

return ctrl.Result{}, nil
}

func (atc ATC) Teardown() {
for _, cleanup := range atc.Cleanups {
cleanup()
}
}
4 changes: 3 additions & 1 deletion cmd/atc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ func run() (err error) {
atc := ATC{
Airway: schema.GroupKind{Group: "yoke.cd", Kind: "Airway"},
Concurrency: cfg.Concurrency,
Cleanups: &sync.Map{},
Cleanups: map[string]func(){},
Locks: &sync.Map{},
}

defer atc.Teardown()

return controller.ProcessGroupKind(ctx, atc.Airway, atc.Reconcile)
}

0 comments on commit 7033ef6

Please sign in to comment.