Skip to content

Commit

Permalink
fix: keep deleting runner crs before disabling sync runner feature
Browse files Browse the repository at this point in the history
  • Loading branch information
rafalgalaw committed Jan 15, 2024
1 parent 231ce1e commit 3f7b174
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 33 deletions.
36 changes: 17 additions & 19 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,28 +187,26 @@ func run() error {
return fmt.Errorf("unable to create controller Repository: %w", err)
}

if config.Config.Operator.RunnerReconciliation {
eventChan := make(chan event.GenericEvent)
runnerReconciler := &garmcontroller.RunnerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}

// setup controller so it can reconcile if events from eventChan are queued
if err = runnerReconciler.SetupWithManager(mgr, eventChan,
controller.Options{
MaxConcurrentReconciles: config.Config.Operator.RunnerConcurrency,
},
); err != nil {
return fmt.Errorf("unable to create controller Runner: %w", err)
}
eventChan := make(chan event.GenericEvent)
runnerReconciler := &garmcontroller.RunnerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}

// fetch runner instances periodically and enqueue reconcile events for runner ctrl if external system has changed
ctx, cancel := context.WithCancel(ctx)
go runnerReconciler.PollRunnerInstances(ctx, eventChan)
defer cancel()
// setup controller so it can reconcile if events from eventChan are queued
if err = runnerReconciler.SetupWithManager(mgr, eventChan,
controller.Options{
MaxConcurrentReconciles: config.Config.Operator.RunnerConcurrency,
},
); err != nil {
return fmt.Errorf("unable to create controller Runner: %w", err)
}

// fetch runner instances periodically and enqueue reconcile events for runner ctrl if external system has changed
ctxWithCancel, cancel := context.WithCancel(ctx)
go runnerReconciler.PollRunnerInstances(ctxWithCancel, cancel, eventChan)
defer cancel()

//+kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
Expand Down
43 changes: 30 additions & 13 deletions internal/controller/runner_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,7 @@ type RunnerReconciler struct {
//+kubebuilder:rbac:groups=garm-operator.mercedes-benz.com,namespace=xxxxx,resources=runners/finalizers,verbs=update

func (r *RunnerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("Reconciling runners...", "Request", req)

instanceClient := garmClient.NewInstanceClient()

return r.reconcile(ctx, req, instanceClient)
}

Expand Down Expand Up @@ -84,9 +80,17 @@ func (r *RunnerReconciler) reconcile(ctx context.Context, req ctrl.Request, inst
}

func (r *RunnerReconciler) handleCreateRunnerCR(ctx context.Context, req ctrl.Request, fetchErr error, garmRunner *params.Instance) (ctrl.Result, error) {
log := log.FromContext(ctx)

if apierrors.IsNotFound(fetchErr) && garmRunner != nil {
return r.createRunnerCR(ctx, garmRunner, req.Namespace)
}

if apierrors.IsNotFound(fetchErr) {
log.Info("object was not found")
return ctrl.Result{}, nil
}

return ctrl.Result{}, fetchErr
}

Expand Down Expand Up @@ -205,7 +209,7 @@ func (r *RunnerReconciler) SetupWithManager(mgr ctrl.Manager, eventChan chan eve
return c.Watch(&source.Channel{Source: eventChan}, &handler.EnqueueRequestForObject{})
}

func (r *RunnerReconciler) PollRunnerInstances(ctx context.Context, eventChan chan event.GenericEvent) {
func (r *RunnerReconciler) PollRunnerInstances(ctx context.Context, cancel context.CancelFunc, eventChan chan event.GenericEvent) {
log := log.FromContext(ctx)
ticker := time.NewTicker(config.Config.Operator.SyncRunnersInterval)
for {
Expand All @@ -215,18 +219,18 @@ func (r *RunnerReconciler) PollRunnerInstances(ctx context.Context, eventChan ch
close(eventChan)
return
case _ = <-ticker.C:
log.Info("Polling Runners...")
log.Info("Syncing Runners...")
instanceClient := garmClient.NewInstanceClient()

err := r.EnqueueRunnerInstances(ctx, instanceClient, eventChan)
err := r.EnqueueRunnerInstances(ctx, cancel, instanceClient, eventChan)
if err != nil {
log.Error(err, "Failed polling runner instances")
}
}
}
}

func (r *RunnerReconciler) EnqueueRunnerInstances(ctx context.Context, instanceClient garmClient.InstanceClient, eventChan chan event.GenericEvent) error {
func (r *RunnerReconciler) EnqueueRunnerInstances(ctx context.Context, cancel context.CancelFunc, instanceClient garmClient.InstanceClient, eventChan chan event.GenericEvent) error {
pools, err := r.fetchPools(ctx)
if err != nil {
return err
Expand All @@ -243,13 +247,15 @@ func (r *RunnerReconciler) EnqueueRunnerInstances(ctx context.Context, instanceC
}

// compares garm db with RunnerCRs and deletes RunnerCRs not present in garm db
err = r.cleanUpNotMatchingRunnerCRs(ctx, garmRunnerInstances)
err = r.cleanUpNotMatchingRunnerCRs(ctx, cancel, garmRunnerInstances)
if err != nil {
return err
}

// triggers controller to reconcile based on instances in garm db
enqeueRunnerEvents(garmRunnerInstances, eventChan)
if config.Config.Operator.RunnerReconciliation {
// triggers controller to reconcile based on instances in garm db
enqeueRunnerEvents(garmRunnerInstances, eventChan)
}
return nil
}

Expand All @@ -270,13 +276,19 @@ func enqeueRunnerEvents(garmRunnerInstances params.Instances, eventChan chan eve
}
}

func (r *RunnerReconciler) cleanUpNotMatchingRunnerCRs(ctx context.Context, garmRunnerInstances params.Instances) error {
func (r *RunnerReconciler) cleanUpNotMatchingRunnerCRs(ctx context.Context, cancel context.CancelFunc, garmRunnerInstances params.Instances) error {
runnerCRList := &garmoperatorv1alpha1.RunnerList{}
err := r.List(ctx, runnerCRList)
if err != nil {
return err
}

// stop runner sync loop entirely
if !config.Config.Operator.RunnerReconciliation && len(runnerCRList.Items) == 0 {
cancel()
return nil
}

runnerCRNameList := slices.Map(runnerCRList.Items, func(runner garmoperatorv1alpha1.Runner) string {
return runner.Name
})
Expand Down Expand Up @@ -343,10 +355,15 @@ func (r *RunnerReconciler) fetchRunnerInstancesByNamespacedPools(instanceClient
}

func getRunnerDiff(runnerCRs, garmRunners []string) []string {
cache := make(map[string]struct{})
var diff []string

for _, runner := range garmRunners {
cache[runner] = struct{}{}
}

for _, runnerCR := range runnerCRs {
if !slices.Contains(garmRunners, runnerCR) {
if _, found := cache[runnerCR]; !found {
diff = append(diff, runnerCR)
}
}
Expand Down
4 changes: 3 additions & 1 deletion internal/controller/runner_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,12 @@ func TestRunnerReconciler_reconcileDeleteCR(t *testing.T) {
tt.expectGarmRequest(mockInstanceClient.EXPECT())

config.Config.Operator.WatchNamespace = "test-namespace"
config.Config.Operator.RunnerReconciliation = true
fakeChan := make(chan event.GenericEvent)
ctxWithCancel, cancel := context.WithCancel(context.Background())

go func() {
err = reconciler.EnqueueRunnerInstances(context.Background(), mockInstanceClient, fakeChan)
err = reconciler.EnqueueRunnerInstances(ctxWithCancel, cancel, mockInstanceClient, fakeChan)
if (err != nil) != tt.wantErr {
t.Errorf("RunnerReconciler.EnqueueRunnerInstances() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down

0 comments on commit 3f7b174

Please sign in to comment.