From 224c213110aca9fb4b6048c9b512be0d86fa33d1 Mon Sep 17 00:00:00 2001 From: lburgazzoli Date: Thu, 21 Feb 2019 18:46:52 +0100 Subject: [PATCH] add finalizer to ensure integration children are cleaned up #477 --- pkg/apis/camel/v1alpha1/integration_types.go | 2 + pkg/cmd/run.go | 10 ++ pkg/controller/integration/delete.go | 103 ++++++++++++++++++ .../integration/integration_controller.go | 12 ++ pkg/trait/gc.go | 85 ++------------- pkg/trait/owner.go | 20 ++++ pkg/trait/owner_test.go | 37 +++++++ pkg/trait/util.go | 1 + pkg/util/finalizer/finalizer.go | 78 +++++++++++++ pkg/util/kubernetes/util.go | 73 +++++++++++++ 10 files changed, 345 insertions(+), 76 deletions(-) create mode 100644 pkg/controller/integration/delete.go create mode 100644 pkg/trait/owner_test.go create mode 100644 pkg/util/finalizer/finalizer.go diff --git a/pkg/apis/camel/v1alpha1/integration_types.go b/pkg/apis/camel/v1alpha1/integration_types.go index 3118324dec..e695af86e5 100644 --- a/pkg/apis/camel/v1alpha1/integration_types.go +++ b/pkg/apis/camel/v1alpha1/integration_types.go @@ -147,6 +147,8 @@ const ( IntegrationPhaseError IntegrationPhase = "Error" // IntegrationPhaseBuildFailureRecovery -- IntegrationPhaseBuildFailureRecovery IntegrationPhase = "Building Failure Recovery" + // IntegrationPhaseDeleting -- + IntegrationPhaseDeleting IntegrationPhase = "Deleting" ) func init() { diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index ffacefa88b..199e6b5708 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -31,6 +31,8 @@ import ( "strings" "syscall" + "github.com/apache/camel-k/pkg/util/finalizer" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/client" "github.com/apache/camel-k/pkg/gzip" @@ -84,6 +86,7 @@ func newCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command { cmd.Flags().BoolVar(&options.Compression, "compression", false, "Enable store source as a compressed binary blob") cmd.Flags().StringSliceVar(&options.Resources, "resource", nil, "Add a resource") cmd.Flags().StringSliceVar(&options.OpenAPIs, "open-api", nil, "Add an OpenAPI v2 spec") + cmd.Flags().BoolVar(&options.Owner, "owner", true, "Use resource ownership to cleanup child resources, if set to false finalizers are used") // completion support configureKnownCompletions(&cmd) @@ -98,6 +101,7 @@ type runCmdOptions struct { Logs bool Sync bool Dev bool + Owner bool IntegrationContext string Runtime string IntegrationName string @@ -338,6 +342,12 @@ func (o *runCmdOptions) updateIntegrationCode(c client.Client, sources []string) }) } + if !o.Owner { + integration.Finalizers = []string{ + finalizer.CamelIntegrationFinalizer, + } + } + if o.Runtime != "" { integration.Spec.AddDependency("runtime:" + o.Runtime) } diff --git a/pkg/controller/integration/delete.go b/pkg/controller/integration/delete.go new file mode 100644 index 0000000000..a05d821959 --- /dev/null +++ b/pkg/controller/integration/delete.go @@ -0,0 +1,103 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + "fmt" + + "github.com/apache/camel-k/pkg/util/finalizer" + + "github.com/apache/camel-k/pkg/util/log" + + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" + "github.com/apache/camel-k/pkg/util/kubernetes" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" +) + +// NewDeleteAction creates a new monitoring action for an integration +func NewDeleteAction() Action { + return &deleteAction{} +} + +type deleteAction struct { + baseAction +} + +func (action *deleteAction) Name() string { + return "delete" +} + +func (action *deleteAction) CanHandle(integration *v1alpha1.Integration) bool { + return integration.Status.Phase == v1alpha1.IntegrationPhaseDeleting +} + +func (action *deleteAction) Handle(ctx context.Context, integration *v1alpha1.Integration) error { + ok, err := finalizer.Exists(integration, finalizer.CamelIntegrationFinalizer) + if err != nil { + return err + } + + // If the integration does not have any finalizer, just skip this step + if !ok { + return nil + } + + l := log.Log.ForIntegration(integration) + + // Select all resources created by this integration + selectors := []string{ + fmt.Sprintf("camel.apache.org/integration=%s", integration.Name), + } + + resources, err := kubernetes.LookUpResources(ctx, action.client, integration.Namespace, selectors) + if err != nil { + return err + } + + // And delete them + for _, resource := range resources { + // pin the resource + resource := resource + + l.Infof("Deleting child resource: %s/%s", resource.GetKind(), resource.GetName()) + + err = action.client.Delete(ctx, &resource) + if err != nil { + // The resource may have already been deleted + if !k8serrors.IsNotFound(err) { + l.Errorf(err, "cannot delete child resource: %s/%s", resource.GetKind(), resource.GetName()) + } + } else { + l.Infof("Child resource deleted: %s/%s", resource.GetKind(), resource.GetName()) + } + } + + target := integration.DeepCopy() + + // + // Remove the finalizer to unlock resource + // + _, err = finalizer.Remove(target, finalizer.CamelIntegrationFinalizer) + if err != nil { + return err + } + + return action.client.Update(ctx, target) +} diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index f5f5c7daa0..7a2b5ad0d3 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -8,6 +8,7 @@ import ( "github.com/apache/camel-k/pkg/client" "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -115,6 +116,12 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R NewDeployAction(), NewErrorRecoveryAction(), NewMonitorAction(), + NewDeleteAction(), + } + + // Delete phase + if instance.GetDeletionTimestamp() != nil { + instance.Status.Phase = camelv1alpha1.IntegrationPhaseDeleting } ilog := rlog.ForIntegration(instance) @@ -131,12 +138,17 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R // Fetch the Integration again and check the state if err = r.client.Get(ctx, request.NamespacedName, instance); err != nil { + if k8serrors.IsNotFound(err) && instance.Status.Phase == camelv1alpha1.IntegrationPhaseDeleting { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err } if instance.Status.Phase == camelv1alpha1.IntegrationPhaseRunning { return reconcile.Result{}, nil } + // Requeue return reconcile.Result{ RequeueAfter: 5 * time.Second, diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go index d05e5b635e..0834e7d14d 100644 --- a/pkg/trait/gc.go +++ b/pkg/trait/gc.go @@ -21,16 +21,13 @@ import ( "context" "fmt" "strconv" - "strings" + + "github.com/apache/camel-k/pkg/util/kubernetes" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" - k8sclient "sigs.k8s.io/controller-runtime/pkg/client" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" - "github.com/apache/camel-k/pkg/client" ) type garbageCollectorTrait struct { @@ -76,8 +73,14 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error { // Register a post action that deletes the existing resources that are labelled // with the previous integration generations. e.PostActions = append(e.PostActions, func(environment *Environment) error { + selectors := []string{ + fmt.Sprintf("camel.apache.org/integration=%s", e.Integration.Name), + "camel.apache.org/generation", + fmt.Sprintf("camel.apache.org/generation notin (%d)", e.Integration.GetGeneration()), + } + // Retrieve older generation resources that may can enlisted for garbage collection - resources, err := getOldGenerationResources(e) + resources, err := kubernetes.LookUpResources(context.TODO(), e.Client, e.Integration.Namespace, selectors) if err != nil { return err } @@ -102,73 +105,3 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error { return nil } - -func getOldGenerationResources(e *Environment) ([]unstructured.Unstructured, error) { - // We rely on the discovery API to retrieve all the resources group and kind. - // That results in an unbounded collection that can be a bit slow (a couple of seconds). - // We may want to refine that step by white-listing or enlisting types to speed-up - // the collection duration. - types, err := getDiscoveryTypes(e.Client) - if err != nil { - return nil, err - } - - selectors := []string{ - fmt.Sprintf("camel.apache.org/integration=%s", e.Integration.Name), - "camel.apache.org/generation", - fmt.Sprintf("camel.apache.org/generation notin (%d)", e.Integration.GetGeneration()), - } - - selector, err := labels.Parse(strings.Join(selectors, ",")) - if err != nil { - return nil, err - } - - res := make([]unstructured.Unstructured, 0) - - for _, t := range types { - options := k8sclient.ListOptions{ - Namespace: e.Integration.Namespace, - LabelSelector: selector, - Raw: &metav1.ListOptions{ - TypeMeta: t, - }, - } - list := unstructured.UnstructuredList{ - Object: map[string]interface{}{ - "apiVersion": t.APIVersion, - "kind": t.Kind, - }, - } - if err := e.Client.List(context.TODO(), &options, &list); err != nil { - if k8serrors.IsNotFound(err) || - k8serrors.IsForbidden(err) || - k8serrors.IsMethodNotSupported(err) { - continue - } - return nil, err - } - - res = append(res, list.Items...) - } - return res, nil -} - -func getDiscoveryTypes(client client.Client) ([]metav1.TypeMeta, error) { - resources, err := client.Discovery().ServerPreferredNamespacedResources() - if err != nil { - return nil, err - } - - types := make([]metav1.TypeMeta, 0) - for _, resource := range resources { - for _, r := range resource.APIResources { - types = append(types, metav1.TypeMeta{ - Kind: r.Kind, - APIVersion: resource.GroupVersion, - }) - } - } - - return types, nil -} diff --git a/pkg/trait/owner.go b/pkg/trait/owner.go index f4d37376f4..4dd2a2e241 100644 --- a/pkg/trait/owner.go +++ b/pkg/trait/owner.go @@ -20,6 +20,9 @@ package trait import ( "strings" + "github.com/apache/camel-k/pkg/util/finalizer" + "github.com/pkg/errors" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -44,6 +47,23 @@ func (t *ownerTrait) Configure(e *Environment) (bool, error) { return false, nil } + if e.Integration == nil { + return false, nil + } + + ok, err := finalizer.Exists(e.Integration, finalizer.CamelIntegrationFinalizer) + if err != nil { + return false, errors.Wrap(err, "failed to read finalizer"+finalizer.CamelIntegrationFinalizer) + } + + if ok { + // + // do not enable this trait if the integration has + // a finalizer + // + return false, nil + } + return e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying), nil } diff --git a/pkg/trait/owner_test.go b/pkg/trait/owner_test.go new file mode 100644 index 0000000000..27d0f47d21 --- /dev/null +++ b/pkg/trait/owner_test.go @@ -0,0 +1,37 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trait + +import ( + "testing" + + "github.com/apache/camel-k/pkg/util/finalizer" + + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" + "github.com/stretchr/testify/assert" +) + +func TestOwnerWithFinalizer(t *testing.T) { + env := createTestEnv(t, v1alpha1.IntegrationPlatformClusterOpenShift, "camel:core") + env.Integration.Finalizers = []string{finalizer.CamelIntegrationFinalizer} + + processTestEnv(t, env) + + assert.NotEmpty(t, env.ExecutedTraits) + assert.Nil(t, env.GetTrait(ID("owner"))) +} diff --git a/pkg/trait/util.go b/pkg/trait/util.go index b0dbff7a24..ea500a74dc 100644 --- a/pkg/trait/util.go +++ b/pkg/trait/util.go @@ -25,6 +25,7 @@ import ( "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/client" + k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) diff --git a/pkg/util/finalizer/finalizer.go b/pkg/util/finalizer/finalizer.go new file mode 100644 index 0000000000..d3f13236e0 --- /dev/null +++ b/pkg/util/finalizer/finalizer.go @@ -0,0 +1,78 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package finalizer + +import ( + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" +) + +const ( + // CamelIntegrationFinalizer -- + CamelIntegrationFinalizer = "finalizer.integration.camel.apache.org" +) + +// Add -- +func Add(obj runtime.Object, value string) error { + accessor, err := meta.Accessor(obj) + if err != nil { + return err + } + + finalizers := sets.NewString(accessor.GetFinalizers()...) + finalizers.Insert(value) + accessor.SetFinalizers(finalizers.List()) + + return nil +} + +// Exists -- +func Exists(obj runtime.Object, finalizer string) (bool, error) { + fzs, err := GetAll(obj) + if err != nil { + return false, err + } + for _, fin := range fzs { + if fin == finalizer { + return true, nil + } + } + return false, nil +} + +// GetAll -- +func GetAll(obj runtime.Object) ([]string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + return accessor.GetFinalizers(), nil +} + +// Remove -- +func Remove(obj runtime.Object, value string) ([]string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, err + } + finalizers := sets.NewString(accessor.GetFinalizers()...) + finalizers.Delete(value) + newFinalizers := finalizers.List() + accessor.SetFinalizers(newFinalizers) + return newFinalizers, nil +} diff --git a/pkg/util/kubernetes/util.go b/pkg/util/kubernetes/util.go index c7987c9777..99264c2ac4 100644 --- a/pkg/util/kubernetes/util.go +++ b/pkg/util/kubernetes/util.go @@ -20,6 +20,10 @@ package kubernetes import ( "context" "fmt" + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/client" @@ -27,8 +31,11 @@ import ( yaml2 "gopkg.in/yaml.v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/json" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -118,3 +125,69 @@ func GetIntegration(context context.Context, client client.Client, name string, return &answer, nil } + +// GetDiscoveryTypes -- +func GetDiscoveryTypes(client client.Client) ([]metav1.TypeMeta, error) { + resources, err := client.Discovery().ServerPreferredNamespacedResources() + if err != nil { + return nil, err + } + + types := make([]metav1.TypeMeta, 0) + for _, resource := range resources { + for _, r := range resource.APIResources { + types = append(types, metav1.TypeMeta{ + Kind: r.Kind, + APIVersion: resource.GroupVersion, + }) + } + } + + return types, nil +} + +// LookUpResources -- +func LookUpResources(ctx context.Context, client client.Client, namespace string, selectors []string) ([]unstructured.Unstructured, error) { + // We rely on the discovery API to retrieve all the resources group and kind. + // That results in an unbounded collection that can be a bit slow (a couple of seconds). + // We may want to refine that step by white-listing or enlisting types to speed-up + // the collection duration. + types, err := GetDiscoveryTypes(client) + if err != nil { + return nil, err + } + + selector, err := labels.Parse(strings.Join(selectors, ",")) + if err != nil { + return nil, err + } + + res := make([]unstructured.Unstructured, 0) + + for _, t := range types { + options := k8sclient.ListOptions{ + Namespace: namespace, + LabelSelector: selector, + Raw: &metav1.ListOptions{ + TypeMeta: t, + }, + } + list := unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "apiVersion": t.APIVersion, + "kind": t.Kind, + }, + } + if err := client.List(ctx, &options, &list); err != nil { + if k8serrors.IsNotFound(err) || + k8serrors.IsForbidden(err) || + k8serrors.IsMethodNotSupported(err) { + continue + } + return nil, err + } + + res = append(res, list.Items...) + } + return res, nil +}