Skip to content

Commit

Permalink
Merge pull request #417 from akrejcir/watch-crds
Browse files Browse the repository at this point in the history
Watch required CRDs, and restart operator if they are removed
  • Loading branch information
kubevirt-bot authored Oct 10, 2022
2 parents 05aa846 + 37d0012 commit c336c0b
Show file tree
Hide file tree
Showing 22 changed files with 1,088 additions and 290 deletions.
101 changes: 0 additions & 101 deletions controllers/crd_controller.go

This file was deleted.

84 changes: 0 additions & 84 deletions controllers/finishable/finishable.go

This file was deleted.

12 changes: 6 additions & 6 deletions controllers/services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func ServiceObject(namespace string) *v1.Service {
// Annotation to generate RBAC roles to read and modify services
// +kubebuilder:rbac:groups="",resources=services,verbs=get;watch;list;create;update;delete

func CreateServiceController(mgr ctrl.Manager) (*serviceReconciler, error) {
return newServiceReconciler(mgr)
func CreateServiceController(ctx context.Context, mgr ctrl.Manager) (*serviceReconciler, error) {
return newServiceReconciler(ctx, mgr)
}

func (r *serviceReconciler) Start(ctx context.Context, mgr ctrl.Manager) error {
Expand Down Expand Up @@ -105,24 +105,24 @@ type serviceReconciler struct {
deployment *apps.Deployment
}

func getOperatorDeployment(namespace string, apiReader client.Reader) (*apps.Deployment, error) {
func getOperatorDeployment(ctx context.Context, namespace string, apiReader client.Reader) (*apps.Deployment, error) {
objKey := client.ObjectKey{Namespace: namespace, Name: OperatorName}
var deployment apps.Deployment
err := apiReader.Get(context.TODO(), objKey, &deployment)
err := apiReader.Get(ctx, objKey, &deployment)
if err != nil {
return nil, fmt.Errorf("getOperatorDeployment, get deployment: %w", err)
}
return &deployment, nil
}

func newServiceReconciler(mgr ctrl.Manager) (*serviceReconciler, error) {
func newServiceReconciler(ctx context.Context, mgr ctrl.Manager) (*serviceReconciler, error) {
logger := ctrl.Log.WithName("controllers").WithName("Resources")
namespace, err := common.GetOperatorNamespace(logger)
if err != nil {
return nil, fmt.Errorf("in newServiceReconciler: %w", err)
}

deployment, err := getOperatorDeployment(namespace, mgr.GetAPIReader())
deployment, err := getOperatorDeployment(ctx, namespace, mgr.GetAPIReader())
if err != nil {
return nil, fmt.Errorf("in newServiceReconciler: %w", err)
}
Expand Down
88 changes: 38 additions & 50 deletions controllers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"fmt"
"path/filepath"

extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"

"kubevirt.io/ssp-operator/internal/common"
crd_watch "kubevirt.io/ssp-operator/internal/crd-watch"
"kubevirt.io/ssp-operator/internal/operands"
common_templates "kubevirt.io/ssp-operator/internal/operands/common-templates"
data_sources "kubevirt.io/ssp-operator/internal/operands/data-sources"
Expand All @@ -19,11 +18,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
)

func CreateAndSetupReconciler(mgr controllerruntime.Manager) error {
// Need to watch CRDs
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch

func CreateAndStartReconciler(ctx context.Context, mgr controllerruntime.Manager) error {
templatesFile := filepath.Join(templateBundleDir, "common-templates-"+common_templates.Version+".yaml")
templatesBundle, err := template_bundle.ReadBundle(templatesFile)
if err != nil {
return err
return fmt.Errorf("failed to read template bundle: %w", err)
}

sspOperands := []operands.Operand{
Expand All @@ -39,79 +41,65 @@ func CreateAndSetupReconciler(mgr controllerruntime.Manager) error {
requiredCrds = append(requiredCrds, sspOperands[i].RequiredCrds()...)
}

// Check if all needed CRDs exist
crdList := &extv1.CustomResourceDefinitionList{}
err = mgr.GetAPIReader().List(context.TODO(), crdList)
mgrCtx, cancel := context.WithCancel(ctx)
defer cancel()

crdWatch := crd_watch.New(requiredCrds...)
// Cleanly stops the manager and exit. The pod will be restarted.
crdWatch.AllCrdsAddedHandler = cancel
crdWatch.SomeCrdRemovedHandler = cancel

err = crdWatch.Init(mgrCtx, mgr.GetAPIReader())
if err != nil {
return err
}

infrastructureTopology, err := common.GetInfrastructureTopology(mgr.GetAPIReader())
if missingCrds := crdWatch.MissingCrds(); len(missingCrds) > 0 {
mgr.GetLogger().Error(nil, "Some required crds are missing. The operator will not create any new resources.",
"missingCrds", missingCrds,
)
}

err = mgr.Add(crdWatch)
if err != nil {
return err
}

serviceController, err := CreateServiceController(mgr)
infrastructureTopology, err := common.GetInfrastructureTopology(mgrCtx, mgr.GetAPIReader())
if err != nil {
return err
return fmt.Errorf("failed to get infrastructure topology: %w", err)
}

serviceController, err := CreateServiceController(mgrCtx, mgr)
if err != nil {
return fmt.Errorf("failed to create service controller: %w", err)
}

err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
err := serviceController.Start(ctx, mgr)
if err != nil {
return fmt.Errorf("error adding serviceController: %w", err)
return fmt.Errorf("error starting serviceController: %w", err)
}

mgr.GetLogger().Info("Services Controller started")

return nil
}))
if err != nil {
return err
return fmt.Errorf("error adding service controller: %w", err)
}

reconciler := NewSspReconciler(mgr.GetClient(), mgr.GetAPIReader(), infrastructureTopology, sspOperands)
reconciler := NewSspReconciler(mgr.GetClient(), mgr.GetAPIReader(), infrastructureTopology, sspOperands, crdWatch)

if requiredCrdsExist(requiredCrds, crdList.Items) {
// No need to start CRD controller
return reconciler.setupController(mgr)
}

mgr.GetLogger().Info("Required CRDs do not exist. Waiting until they are installed.",
"required_crds", requiredCrds,
)

crdController, err := CreateCrdController(mgr, requiredCrds)
err = reconciler.setupController(mgr)
if err != nil {
return err
}

return mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
// First start the CRD controller
err := crdController.Start(ctx)
if err != nil {
return err
}

mgr.GetLogger().Info("Required CRDs were installed, starting SSP operator.")

// Clear variable, so it can be garbage collected
crdController = nil

// After it is finished, add the SSP controller to the manager
return reconciler.setupController(mgr)
}))
}

func requiredCrdsExist(required []string, foundCrds []extv1.CustomResourceDefinition) bool {
OuterLoop:
for i := range required {
for j := range foundCrds {
if required[i] == foundCrds[j].Name {
continue OuterLoop
}
}
return false
mgr.GetLogger().Info("starting manager")
if err := mgr.Start(mgrCtx); err != nil {
mgr.GetLogger().Error(err, "problem running manager")
return err
}
return true
return nil
}
Loading

0 comments on commit c336c0b

Please sign in to comment.