diff --git a/pkg/autopilot/controller/root_worker.go b/pkg/autopilot/controller/root_worker.go index 88417920c7cb..d8ea82c1b0e3 100644 --- a/pkg/autopilot/controller/root_worker.go +++ b/pkg/autopilot/controller/root_worker.go @@ -17,22 +17,27 @@ package controller import ( "context" "fmt" + "slices" "time" + autopilotv1beta2 "github.com/k0sproject/k0s/pkg/apis/autopilot/v1beta2" apcli "github.com/k0sproject/k0s/pkg/autopilot/client" apdel "github.com/k0sproject/k0s/pkg/autopilot/controller/delegate" aproot "github.com/k0sproject/k0s/pkg/autopilot/controller/root" apsig "github.com/k0sproject/k0s/pkg/autopilot/controller/signal" + k0sscheme "github.com/k0sproject/k0s/pkg/client/clientset/scheme" + "github.com/k0sproject/k0s/pkg/kubernetes/watch" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" - k8sretry "k8s.io/client-go/util/retry" + cr "sigs.k8s.io/controller-runtime" crman "sigs.k8s.io/controller-runtime/pkg/manager" crmetricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" crwebhook "sigs.k8s.io/controller-runtime/pkg/webhook" - "github.com/avast/retry-go" "github.com/sirupsen/logrus" ) @@ -69,49 +74,127 @@ func (w *rootWorker) Run(ctx context.Context) error { HealthProbeBindAddress: w.cfg.HealthProbeBindAddr, } - var mgr crman.Manager - if err := retry.Do( - func() (err error) { - mgr, err = cr.NewManager(w.clientFactory.RESTConfig(), managerOpts) - return err - }, - retry.Context(ctx), - retry.LastErrorOnly(true), - retry.Delay(1*time.Second), - retry.OnRetry(func(attempt uint, err error) { - logger.WithError(err).Debugf("Failed to start controller manager in attempt #%d, retrying after backoff", attempt+1) - }), - ); err != nil { - logger.WithError(err).Fatal("unable to start controller manager") + // We need to wait until all autopilot CRDs are established. + var waitErr error + if pollErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) { + waitErr = w.waitForCRDs(ctx) + return waitErr == nil, nil + }); pollErr != nil { + if waitErr == nil { + return pollErr + } + + return fmt.Errorf("%w: %w", pollErr, waitErr) + } + + clusterID, err := w.getClusterID(ctx) + if err != nil { + return err + } + + mgr, err := cr.NewManager(w.clientFactory.RESTConfig(), managerOpts) + if err != nil { + return fmt.Errorf("unable to start controller manager: %w", err) + } + + if err := RegisterIndexers(ctx, mgr, "worker"); err != nil { + return fmt.Errorf("unable to register indexers: %w", err) + } + + if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil { + return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err) } - // In some cases, we need to wait on the worker side until controller deploys all autopilot CRDs - return k8sretry.OnError(wait.Backoff{ - Steps: 120, - Duration: 1 * time.Second, - Factor: 1.0, - Jitter: 0.1, - }, func(err error) bool { - return true - }, func() error { - clusterID, err := w.getClusterID(ctx) + // The controller-runtime start blocks until the context is cancelled. + if err := mgr.Start(ctx); err != nil { + return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err) + } + + return nil +} + +func (w *rootWorker) waitForCRDs(ctx context.Context) error { + // Gather all kinds in the autopilot API group + var kinds []string + gv := autopilotv1beta2.SchemeGroupVersion + for kind := range k0sscheme.Scheme.KnownTypes(gv) { + // For some reason, the scheme also returns types from core/v1. Filter + // those out by only adding kinds which are _only_ in the autopilot + // group, and not in some other group as well. The only way to get all + // the GVKs for a certain type is by creating a new instance of that + // type and then asking the scheme about it. + obj, err := k0sscheme.Scheme.New(gv.WithKind(kind)) if err != nil { return err } - - if err := RegisterIndexers(ctx, mgr, "worker"); err != nil { - return fmt.Errorf("unable to register indexers: %w", err) + gvks, _, err := k0sscheme.Scheme.ObjectKinds(obj) + if err != nil { + return err } - if err := apsig.RegisterControllers(ctx, logger, mgr, apdel.NodeControllerDelegate(), w.cfg.K0sDataDir, clusterID); err != nil { - return fmt.Errorf("unable to register 'controlnodes' controllers: %w", err) - } - // The controller-runtime start blocks until the context is cancelled. - if err := mgr.Start(ctx); err != nil { - return fmt.Errorf("unable to run controller-runtime manager for workers: %w", err) + // Skip the kind if there's at least one GVK which is not in the + // autopilot group + if !slices.ContainsFunc(gvks, func(gvk schema.GroupVersionKind) bool { + return gvk.Group != autopilotv1beta2.GroupName + }) { + kinds = append(kinds, kind) } - return nil - }) + } + + client, err := w.clientFactory.GetExtensionClient() + if err != nil { + return err + } + + // Watch all the CRDs until all the required ones are established. + slices.Sort(kinds) // for cosmetic purposes + if err = watch.CRDs(client.CustomResourceDefinitions()). + WithErrorCallback(func(err error) (time.Duration, error) { + if retryAfter, e := watch.IsRetryable(err); e == nil { + w.log.WithError(err).Info( + "Transient error while watching for CRDs", + ", starting over after ", retryAfter, " ...", + ) + return retryAfter, nil + } + + retryAfter := 10 * time.Second + w.log.WithError(err).Error( + "Error while watching CRDs", + ", starting over after ", retryAfter, " ...", + ) + return retryAfter, nil + }). + Until(ctx, func(item *apiextensionsv1.CustomResourceDefinition) (bool, error) { + if item.Spec.Group != autopilotv1beta2.GroupName { + return false, nil // Not an autopilot CRD. + } + + // Find the established status for the CRD. + var established apiextensionsv1.ConditionStatus + for _, cond := range item.Status.Conditions { + if cond.Type == apiextensionsv1.Established { + established = cond.Status + break + } + } + + if established != apiextensionsv1.ConditionTrue { + return false, nil // CRD not yet established. + } + + // Remove the CRD's (list) kind from the list. + kinds = slices.DeleteFunc(kinds, func(kind string) bool { + return kind == item.Spec.Names.Kind || kind == item.Spec.Names.ListKind + }) + + // If the list is empty, all required CRDs are established. + return len(kinds) < 1, nil + }); err != nil { + return fmt.Errorf("while waiting for Autopilot CRDs %v to become established: %w", kinds, err) + } + + return nil } func (w *rootWorker) getClusterID(ctx context.Context) (string, error) {