Skip to content

Commit

Permalink
chore(integration controller): replace custom reconcile trigger with …
Browse files Browse the repository at this point in the history
…watch
  • Loading branch information
lburgazzoli authored and nicolaferraro committed Jun 26, 2019
1 parent 1de2681 commit 3a4c882
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 34 deletions.
40 changes: 37 additions & 3 deletions pkg/controller/integration/integration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,45 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

// Watch for IntegrationPlatform phase transitioning to ready
// and enqueue requests for any integrations that are in phase waiting for platform
// Watch for IntegrationKit phase transitioning to ready or error and
// enqueue requests for any integrations that are in phase waiting for
// kit
err = c.Watch(&source.Kind{Type: &v1alpha1.IntegrationKit{}}, &handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
kit := a.Object.(*v1alpha1.IntegrationKit)
var requests []reconcile.Request

if kit.Status.Phase == v1alpha1.IntegrationKitPhaseReady || kit.Status.Phase == v1alpha1.IntegrationKitPhaseError {
list := &v1alpha1.IntegrationList{}

if err := mgr.GetClient().List(context.TODO(), &k8sclient.ListOptions{Namespace: kit.Namespace}, list); err != nil {
log.Error(err, "Failed to retrieve integration list")
return requests
}

for _, integration := range list.Items {
if integration.Status.Phase == v1alpha1.IntegrationPhaseBuildingKit {
log.Infof("Kit %s ready, wake-up integration: %s", kit.Name, integration.Name)
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: integration.Namespace,
Name: integration.Name,
},
})
}
}
}

return requests
}),
})

// Watch for IntegrationPlatform phase transitioning to ready and enqueue
// requests for any integrations that are in phase waiting for platform
err = c.Watch(&source.Kind{Type: &v1alpha1.IntegrationPlatform{}}, &handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
platform := a.Object.(*v1alpha1.IntegrationPlatform)
requests := []reconcile.Request{}
var requests []reconcile.Request

if platform.Status.Phase == v1alpha1.IntegrationPlatformPhaseReady {
list := &v1alpha1.IntegrationList{}
Expand All @@ -126,6 +159,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {

for _, integration := range list.Items {
if integration.Status.Phase == v1alpha1.IntegrationPhaseWaitingForPlatform {
log.Infof("Platform %s ready, wake-up integration: %s", platform.Name, integration.Name)
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: integration.Namespace,
Expand Down
35 changes: 4 additions & 31 deletions pkg/controller/integrationkit/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@ import (
"fmt"

"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/builder"
"github.com/apache/camel-k/pkg/trait"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/pkg/errors"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/builder"
"github.com/apache/camel-k/pkg/trait"
)

// NewBuildAction creates a new build request handling action for the kit
Expand Down Expand Up @@ -175,11 +174,6 @@ func (action *buildAction) handleBuildRunning(ctx context.Context, kit *v1alpha1
return err
}

action.L.Info("Inform integrations about kit state change")
if err := action.informIntegrations(ctx, target); err != nil {
return err
}

case v1alpha1.BuildPhaseError, v1alpha1.BuildPhaseInterrupted:
target := kit.DeepCopy()

Expand All @@ -204,24 +198,3 @@ func (action *buildAction) handleBuildRunning(ctx context.Context, kit *v1alpha1

return nil
}

// informIntegrations triggers the processing of all integrations waiting for this kit to be built
func (action *buildAction) informIntegrations(ctx context.Context, kit *v1alpha1.IntegrationKit) error {
list := v1alpha1.NewIntegrationList()
err := action.client.List(ctx, &k8sclient.ListOptions{Namespace: kit.Namespace}, &list)
if err != nil {
return err
}
for _, integration := range list.Items {
integration := integration // pin
if integration.Status.Kit != kit.Name {
continue
}
integration.Status.Phase = v1alpha1.IntegrationPhaseResolvingKit
err = action.client.Status().Update(ctx, &integration)
if err != nil {
return err
}
}
return nil
}

0 comments on commit 3a4c882

Please sign in to comment.