Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GC trait discovery client caching options and server-side deletion #1062

Merged
merged 6 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/modules/ROOT/pages/traits.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -447,4 +447,12 @@ There are also platform traits that **normally should not be configured** by the
+
It's enabled by default.

[cols="m,"]
!===

! gc.discovery-cache
! Discovery client cache to be used, either `disabled`, `disk` or `memory` (default `memory`)

!===

|=======================
15 changes: 14 additions & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,26 @@ import (
"os/user"
"path/filepath"

"github.com/apache/camel-k/pkg/apis"
"github.com/operator-framework/operator-sdk/pkg/k8sutil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"k8s.io/client-go/kubernetes"
clientscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"

clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest"
controller "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/apache/camel-k/pkg/apis"
)

const inContainerNamespaceFile = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
Expand All @@ -47,6 +52,7 @@ type Client interface {
controller.Client
kubernetes.Interface
GetScheme() *runtime.Scheme
GetConfig() *rest.Config
}

// Injectable identifies objects that can receive a Client
Expand All @@ -63,12 +69,17 @@ type defaultClient struct {
controller.Client
kubernetes.Interface
scheme *runtime.Scheme
config *rest.Config
}

func (c *defaultClient) GetScheme() *runtime.Scheme {
return c.scheme
}

func (c *defaultClient) GetConfig() *rest.Config {
return c.config
}

// NewOutOfClusterClient creates a new k8s client that can be used from outside the cluster
func NewOutOfClusterClient(kubeconfig string) (Client, error) {
initialize(kubeconfig)
Expand Down Expand Up @@ -115,6 +126,7 @@ func NewClient(fastDiscovery bool) (Client, error) {
Client: dynClient,
Interface: clientset,
scheme: clientOptions.Scheme,
config: cfg,
}, nil
}

Expand All @@ -129,6 +141,7 @@ func FromManager(manager manager.Manager) (Client, error) {
Client: manager.GetClient(),
Interface: clientset,
scheme: manager.GetScheme(),
config: manager.GetConfig(),
}, nil
}

Expand Down
208 changes: 144 additions & 64 deletions pkg/trait/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,47 @@ package trait

import (
"context"
"path/filepath"
"regexp"
"strconv"
"sync"
"time"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/disk"
"k8s.io/client-go/discovery/cached/memory"

controller "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/util"
controllerUtil "github.com/apache/camel-k/pkg/util/controller"
util "github.com/apache/camel-k/pkg/util/controller"
)

var (
toFileName = regexp.MustCompile(`[^(\w/\.)]`)
diskCachedDiscoveryClient discovery.CachedDiscoveryInterface
memoryCachedDiscoveryClient discovery.CachedDiscoveryInterface
DiscoveryClientLock sync.Mutex
)

type discoveryCacheType string

const (
disabledDiscoveryCache discoveryCacheType = "disabled"
diskDiscoveryCache discoveryCacheType = "disk"
memoryDiscoveryCache discoveryCacheType = "memory"
)

type garbageCollectorTrait struct {
BaseTrait `property:",squash"`
BaseTrait `property:",squash"`
DiscoveryCache *discoveryCacheType `property:"discovery-cache"`
}

func newGarbageCollectorTrait() *garbageCollectorTrait {
Expand All @@ -51,6 +73,11 @@ func (t *garbageCollectorTrait) Configure(e *Environment) (bool, error) {
return false, nil
}

if t.DiscoveryCache == nil {
s := memoryDiscoveryCache
t.DiscoveryCache = &s
}

return e.IntegrationInPhase(
v1alpha1.IntegrationPhaseInitialization,
v1alpha1.IntegrationPhaseDeploying,
Expand Down Expand Up @@ -104,92 +131,145 @@ func (t *garbageCollectorTrait) garbageCollectResources(e *Environment) {
Add(*integration).
Add(*generation)

// Retrieve older generation resources to be enlisted for garbage collection.
// We rely on the discovery API to retrieve all the resources group and kind.
// That results in an unbounded collection that can be a bit slow.
// We may want to refine that step by white-listing or enlisting types to speed-up
// the collection duration.
resources, err := lookUpResources(context.TODO(), e.Client, e.Integration.Namespace, selector)
collectionGVKs, deletableGVKs, err := t.getDeletableTypes(e)
if err != nil {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot collect older generation resources")
t.L.ForIntegration(e.Integration).Errorf(err, "cannot discover GVK types")
return
}

// And delete them
for _, resource := range resources {
// pin the resource
resource := resource
err = e.Client.Delete(context.TODO(), &resource, controller.PropagationPolicy(metav1.DeletePropagationBackground))
t.deleteAllOf(collectionGVKs, e, selector)
// TODO: DeleteCollection is currently not supported for Service resources, so we have to keep
// client-side collection deletion around until it becomes supported.
t.deleteEachOf(deletableGVKs, e, selector)
}

func (t *garbageCollectorTrait) deleteAllOf(gvks map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) {
for gvk := range gvks {
err := e.Client.DeleteAllOf(context.TODO(),
&unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": gvk.GroupVersion().String(),
"kind": gvk.Kind,
"metadata": map[string]interface{}{
"namespace": e.Integration.Namespace,
},
},
},
// FIXME: The unstructured client doesn't take the namespace option into account
//controller.InNamespace(e.Integration.Namespace),
util.MatchingSelector{Selector: selector},
client.PropagationPolicy(metav1.DeletePropagationBackground),
)
if err != nil {
// The resource may have already been deleted
if !k8serrors.IsNotFound(err) {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot delete child resource: %s/%s", resource.GetKind(), resource.GetName())
}
t.L.ForIntegration(e.Integration).Errorf(err, "cannot delete child resources: %v", gvk)
} else {
t.L.ForIntegration(e.Integration).Debugf("child resource deleted: %s/%s", resource.GetKind(), resource.GetName())
t.L.ForIntegration(e.Integration).Debugf("child resources deleted: %v", gvk)
}
}
}

func lookUpResources(ctx context.Context, client client.Client, namespace string, selector labels.Selector) ([]unstructured.Unstructured, error) {
// We only take types that support the "create" and "list" verbs as:
// - they have to be created to be deleted :) so that excludes read-only
// resources, e.g., aggregated APIs
// - they are going to be iterated and a list query with labels selector
// is performed for each of them. That prevents from performing queries
// that we know are going to return "MethodNotAllowed".
types, err := getDiscoveryTypesWithVerbs(client, []string{"create", "list"})
if err != nil {
return nil, err
}

res := make([]unstructured.Unstructured, 0)

for _, t := range types {
list := unstructured.UnstructuredList{
func (t *garbageCollectorTrait) deleteEachOf(gvks map[schema.GroupVersionKind]struct{}, e *Environment, selector labels.Selector) {
for gvk := range gvks {
resources := unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": t.APIVersion,
"kind": t.Kind,
"apiVersion": gvk.GroupVersion().String(),
"kind": gvk.Kind,
},
}
options := []controller.ListOption{
controller.InNamespace(namespace),
controllerUtil.MatchingSelector{Selector: selector},
options := []client.ListOption{
client.InNamespace(e.Integration.Namespace),
util.MatchingSelector{Selector: selector},
}
if err := client.List(ctx, &list, options...); err != nil {
if k8serrors.IsNotFound(err) || k8serrors.IsForbidden(err) {
continue
if err := t.client.List(context.TODO(), &resources, options...); err != nil {
if !k8serrors.IsNotFound(err) && !k8serrors.IsForbidden(err) {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot list child resources: %v", gvk)
}
return nil, err
continue
}

res = append(res, list.Items...)
for _, resource := range resources.Items {
r := resource
err := t.client.Delete(context.TODO(), &r, client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil {
// The resource may have already been deleted
if !k8serrors.IsNotFound(err) {
t.L.ForIntegration(e.Integration).Errorf(err, "cannot delete child resource: %s/%s", resource.GetKind(), resource.GetName())
}
} else {
t.L.ForIntegration(e.Integration).Debugf("child resource deleted: %s/%s", resource.GetKind(), resource.GetName())
}
}
}
return res, nil
}

func getDiscoveryTypesWithVerbs(client client.Client, verbs []string) ([]metav1.TypeMeta, error) {
resources, err := client.Discovery().ServerPreferredNamespacedResources()
func (t *garbageCollectorTrait) getDeletableTypes(e *Environment) (map[schema.GroupVersionKind]struct{}, map[schema.GroupVersionKind]struct{}, error) {
// We rely on the discovery API to retrieve all the resources GVK,
// that results in an unbounded set that can impact garbage collection latency when scaling up.
discoveryClient, err := t.discoveryClient(e)
if err != nil {
return nil, nil, err
}
resources, err := discoveryClient.ServerPreferredNamespacedResources()
// Swallow group discovery errors, e.g., Knative serving exposes
// an aggregated API for custom.metrics.k8s.io that requires special
// authentication scheme while discovering preferred resources
if err != nil && !discovery.IsGroupDiscoveryFailedError(err) {
return nil, err
return nil, nil, err
}

types := make([]metav1.TypeMeta, 0)
for _, resource := range resources {
for _, r := range resource.APIResources {
if len(verbs) > 0 && !util.StringSliceContains(r.Verbs, verbs) {
// Do not return the type if it does not support the provided verbs
continue
}
types = append(types, metav1.TypeMeta{
Kind: r.Kind,
APIVersion: resource.GroupVersion,
})
// We only take types that support the "delete" and "deletecollection" verbs,
// to prevents from performing queries that we know are going to return "MethodNotAllowed".
return groupVersionKinds(discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"deletecollection"}}, resources)),
groupVersionKinds(discovery.FilteredBy(supportsDeleteVerbOnly{}, resources)),
nil
}

func groupVersionKinds(rls []*metav1.APIResourceList) map[schema.GroupVersionKind]struct{} {
GVKs := map[schema.GroupVersionKind]struct{}{}
for _, rl := range rls {
for _, r := range rl.APIResources {
GVKs[schema.FromAPIVersionAndKind(rl.GroupVersion, r.Kind)] = struct{}{}
}
}
return GVKs
}

// supportsDeleteVerbOnly is a predicate matching a resource if it supports the delete verb, but not deletecollection.
type supportsDeleteVerbOnly struct{}

return types, nil
func (p supportsDeleteVerbOnly) Match(groupVersion string, r *metav1.APIResource) bool {
verbs := sets.NewString([]string(r.Verbs)...)
return verbs.Has("delete") && !verbs.Has("deletecollection")
}

func (t *garbageCollectorTrait) discoveryClient(e *Environment) (discovery.DiscoveryInterface, error) {
DiscoveryClientLock.Lock()
defer DiscoveryClientLock.Unlock()

switch *t.DiscoveryCache {
case diskDiscoveryCache:
if diskCachedDiscoveryClient != nil {
return diskCachedDiscoveryClient, nil
}
config := t.client.GetConfig()
httpCacheDir := filepath.Join(mustHomeDir(), ".kube", "http-cache")
diskCacheDir := filepath.Join(mustHomeDir(), ".kube", "cache", "discovery", toHostDir(config.Host))
var err error
diskCachedDiscoveryClient, err = disk.NewCachedDiscoveryClientForConfig(config, diskCacheDir, httpCacheDir, 10*time.Minute)
return diskCachedDiscoveryClient, err

case memoryDiscoveryCache:
if memoryCachedDiscoveryClient != nil {
return memoryCachedDiscoveryClient, nil
}
memoryCachedDiscoveryClient = memory.NewMemCacheClient(t.client.Discovery())
return memoryCachedDiscoveryClient, nil

case disabledDiscoveryCache, "":
return t.client.Discovery(), nil

default:
t.L.ForIntegration(e.Integration).Infof("unsupported discovery cache type: %s", *t.DiscoveryCache)
return t.client.Discovery(), nil
}
}
14 changes: 14 additions & 0 deletions pkg/trait/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package trait
import (
"context"
"fmt"
"os/user"
"reflect"
"regexp"
"strings"
Expand Down Expand Up @@ -166,3 +167,16 @@ func decodeTraitSpec(in *v1alpha1.TraitSpec, target interface{}) error {

return decoder.Decode(in.Configuration)
}

func mustHomeDir() string {
usr, err := user.Current()
if err != nil {
panic(err)
}
return usr.HomeDir
}

func toHostDir(host string) string {
h := strings.Replace(strings.Replace(host, "https://", "", 1), "http://", "", 1)
return toFileName.ReplaceAllString(h, "_")
}
4 changes: 4 additions & 0 deletions pkg/util/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ type MatchingSelector struct {
func (s MatchingSelector) ApplyToList(opts *client.ListOptions) {
opts.LabelSelector = s.Selector
}

func (s MatchingSelector) ApplyToDeleteAllOf(opts *client.DeleteAllOfOptions) {
opts.LabelSelector = s.Selector
}
Loading