Skip to content

Commit

Permalink
add finalizer to ensure integration children are cleaned up apache#477
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli authored and valdar committed May 21, 2019
1 parent 92c957b commit 739b9aa
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 77 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/camel/v1alpha1/integration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ const (
IntegrationPhaseError IntegrationPhase = "Error"
// IntegrationPhaseBuildFailureRecovery --
IntegrationPhaseBuildFailureRecovery IntegrationPhase = "Building Failure Recovery"
// IntegrationPhaseDeleting --
IntegrationPhaseDeleting IntegrationPhase = "Deleting"
)

func init() {
Expand Down
12 changes: 12 additions & 0 deletions pkg/cmd/completion_bash.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ __kamel_runtimes() {
COMPREPLY=( $( compgen -W "${type_list}" -- "$cur") )
}
__kamel_deletion_policy() {
local type_list="owner label"
COMPREPLY=( $( compgen -W "${type_list}" -- "$cur") )
}
__kamel_kubectl_get_configmap() {
local template
local kubectl_out
Expand Down Expand Up @@ -244,6 +249,13 @@ func configureKnownBashCompletions(command *cobra.Command) {
cobra.BashCompCustom: {"__kamel_traits"},
},
)
configureBashAnnotationForFlag(
command,
"deletion-policy",
map[string][]string{
cobra.BashCompCustom: {"__kamel_deletion_policy"},
},
)
}

func configureBashAnnotationForFlag(command *cobra.Command, flagName string, annotations map[string][]string) {
Expand Down
12 changes: 11 additions & 1 deletion pkg/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"strings"
"syscall"

"github.com/apache/camel-k/pkg/util/finalizer"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/gzip"
Expand Down Expand Up @@ -68,7 +70,7 @@ func newCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command {
cmd.Flags().StringVar(&options.IntegrationName, "name", "", "The integration name")
cmd.Flags().StringSliceVarP(&options.Dependencies, "dependency", "d", nil, "The integration dependency")
cmd.Flags().BoolVarP(&options.Wait, "wait", "w", false, "Waits for the integration to be running")
cmd.Flags().StringVarP(&options.IntegrationContext, "context", "x", "", "The contex used to run the integration")
cmd.Flags().StringVarP(&options.IntegrationContext, "context", "x", "", "The context used to run the integration")
cmd.Flags().StringArrayVarP(&options.Properties, "property", "p", nil, "Add a camel property")
cmd.Flags().StringSliceVar(&options.ConfigMaps, "configmap", nil, "Add a ConfigMap")
cmd.Flags().StringSliceVar(&options.Secrets, "secret", nil, "Add a Secret")
Expand All @@ -84,6 +86,7 @@ func newCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command {
cmd.Flags().BoolVar(&options.Compression, "compression", false, "Enable store source as a compressed binary blob")
cmd.Flags().StringSliceVar(&options.Resources, "resource", nil, "Add a resource")
cmd.Flags().StringSliceVar(&options.OpenAPIs, "open-api", nil, "Add an OpenAPI v2 spec")
cmd.Flags().StringVar(&options.DeletionPolicy, "deletion-policy", "owner", "Policy used to cleanup child resources, default owner")

// completion support
configureKnownCompletions(&cmd)
Expand All @@ -98,6 +101,7 @@ type runCmdOptions struct {
Logs bool
Sync bool
Dev bool
DeletionPolicy string
IntegrationContext string
Runtime string
IntegrationName string
Expand Down Expand Up @@ -338,6 +342,12 @@ func (o *runCmdOptions) updateIntegrationCode(c client.Client, sources []string)
})
}

if o.DeletionPolicy == "label" {
integration.Finalizers = []string{
finalizer.CamelIntegrationFinalizer,
}
}

if o.Runtime != "" {
integration.Spec.AddDependency("runtime:" + o.Runtime)
}
Expand Down
152 changes: 152 additions & 0 deletions pkg/controller/integration/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/apache/camel-k/pkg/util/finalizer"

"github.com/apache/camel-k/pkg/util/log"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/util/kubernetes"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
)

// NewDeleteAction creates a new monitoring action for an integration
func NewDeleteAction() Action {
return &deleteAction{}
}

type deleteAction struct {
baseAction
}

func (action *deleteAction) Name() string {
return "delete"
}

func (action *deleteAction) CanHandle(integration *v1alpha1.Integration) bool {
return integration.Status.Phase == v1alpha1.IntegrationPhaseDeleting
}

func (action *deleteAction) Handle(ctx context.Context, integration *v1alpha1.Integration) error {
l := log.Log.ForIntegration(integration)

ok, err := finalizer.Exists(integration, finalizer.CamelIntegrationFinalizer)
if err != nil {
return err
}
if !ok {
return nil
}

target := integration.DeepCopy()

// Select all resources created by this integration
selectors := []string{
fmt.Sprintf("camel.apache.org/integration=%s", integration.Name),
}

resources, err := kubernetes.LookUpResources(ctx, action.client, integration.Namespace, selectors)
if err != nil {
return err
}

// If the ForegroundDeletion deletion is set remove the finalizer and
// delete child resources from a dedicated goroutine
ok, err = finalizer.Exists(integration, finalizer.ForegroundDeletion)
if err != nil {
return err
}

if ok {
//
// Async
//

if err := action.removeFinalizer(ctx, target); err != nil {
return err
}

go func() {
if err := action.deleteResources(context.TODO(), target, resources); err != nil {
l.Error(err, "error deleting child resources")
}
}()
} else {
//
// Sync
//
if err := action.deleteResources(ctx, target, resources); err != nil {
return err
}
if err = action.removeFinalizer(ctx, target); err != nil {
return err
}
}

return nil
}

func (action *deleteAction) removeFinalizer(ctx context.Context, integration *v1alpha1.Integration) error {
_, err := finalizer.Remove(integration, finalizer.CamelIntegrationFinalizer)
if err != nil {
return err
}

return action.client.Update(ctx, integration)
}

func (action *deleteAction) deleteResources(ctx context.Context, integration *v1alpha1.Integration, resources []unstructured.Unstructured) error {
l := log.Log.ForIntegration(integration)

// And delete them
for _, resource := range resources {
// pin the resource
resource := resource

// Pods are automatically deleted by the removal of Deployment
if resource.GetKind() == "Pod" {
continue
}
// ReplicaSet are automatically deleted by the removal of Deployment
if resource.GetKind() == "ReplicaSet" {
continue
}

l.Infof("Deleting child resource: %s/%s", resource.GetKind(), resource.GetName())

err := action.client.Delete(ctx, &resource)
if err != nil {
// The resource may have already been deleted
if !k8serrors.IsNotFound(err) {
l.Errorf(err, "cannot delete child resource: %s/%s", resource.GetKind(), resource.GetName())
}
} else {
l.Infof("Child resource deleted: %s/%s", resource.GetKind(), resource.GetName())
}
}

return nil
}
12 changes: 12 additions & 0 deletions pkg/controller/integration/integration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/apache/camel-k/pkg/client"

"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"

"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -115,6 +116,12 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R
NewDeployAction(),
NewErrorRecoveryAction(),
NewMonitorAction(),
NewDeleteAction(),
}

// Delete phase
if instance.GetDeletionTimestamp() != nil {
instance.Status.Phase = camelv1alpha1.IntegrationPhaseDeleting
}

ilog := rlog.ForIntegration(instance)
Expand All @@ -131,12 +138,17 @@ func (r *ReconcileIntegration) Reconcile(request reconcile.Request) (reconcile.R

// Fetch the Integration again and check the state
if err = r.client.Get(ctx, request.NamespacedName, instance); err != nil {
if k8serrors.IsNotFound(err) && instance.Status.Phase == camelv1alpha1.IntegrationPhaseDeleting {
return reconcile.Result{}, nil
}

return reconcile.Result{}, err
}

if instance.Status.Phase == camelv1alpha1.IntegrationPhaseRunning {
return reconcile.Result{}, nil
}

// Requeue
return reconcile.Result{
RequeueAfter: 5 * time.Second,
Expand Down
85 changes: 9 additions & 76 deletions pkg/trait/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@ import (
"context"
"fmt"
"strconv"
"strings"

"github.com/apache/camel-k/pkg/util/kubernetes"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/client"
)

type garbageCollectorTrait struct {
Expand Down Expand Up @@ -76,8 +73,14 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error {
// Register a post action that deletes the existing resources that are labelled
// with the previous integration generations.
e.PostActions = append(e.PostActions, func(environment *Environment) error {
selectors := []string{
fmt.Sprintf("camel.apache.org/integration=%s", e.Integration.Name),
"camel.apache.org/generation",
fmt.Sprintf("camel.apache.org/generation notin (%d)", e.Integration.GetGeneration()),
}

// Retrieve older generation resources that may can enlisted for garbage collection
resources, err := getOldGenerationResources(e)
resources, err := kubernetes.LookUpResources(context.TODO(), e.Client, e.Integration.Namespace, selectors)
if err != nil {
return err
}
Expand All @@ -102,73 +105,3 @@ func (t *garbageCollectorTrait) Apply(e *Environment) error {

return nil
}

func getOldGenerationResources(e *Environment) ([]unstructured.Unstructured, error) {
// We rely on the discovery API to retrieve all the resources group and kind.
// That results in an unbounded collection that can be a bit slow (a couple of seconds).
// We may want to refine that step by white-listing or enlisting types to speed-up
// the collection duration.
types, err := getDiscoveryTypes(e.Client)
if err != nil {
return nil, err
}

selectors := []string{
fmt.Sprintf("camel.apache.org/integration=%s", e.Integration.Name),
"camel.apache.org/generation",
fmt.Sprintf("camel.apache.org/generation notin (%d)", e.Integration.GetGeneration()),
}

selector, err := labels.Parse(strings.Join(selectors, ","))
if err != nil {
return nil, err
}

res := make([]unstructured.Unstructured, 0)

for _, t := range types {
options := k8sclient.ListOptions{
Namespace: e.Integration.Namespace,
LabelSelector: selector,
Raw: &metav1.ListOptions{
TypeMeta: t,
},
}
list := unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": t.APIVersion,
"kind": t.Kind,
},
}
if err := e.Client.List(context.TODO(), &options, &list); err != nil {
if k8serrors.IsNotFound(err) ||
k8serrors.IsForbidden(err) ||
k8serrors.IsMethodNotSupported(err) {
continue
}
return nil, err
}

res = append(res, list.Items...)
}
return res, nil
}

func getDiscoveryTypes(client client.Client) ([]metav1.TypeMeta, error) {
resources, err := client.Discovery().ServerPreferredNamespacedResources()
if err != nil {
return nil, err
}

types := make([]metav1.TypeMeta, 0)
for _, resource := range resources {
for _, r := range resource.APIResources {
types = append(types, metav1.TypeMeta{
Kind: r.Kind,
APIVersion: resource.GroupVersion,
})
}
}

return types, nil
}
Loading

0 comments on commit 739b9aa

Please sign in to comment.