Skip to content

Commit

Permalink
pkg/ansible: Bugfix/update owner ref behavior (#1148)
Browse files Browse the repository at this point in the history
* Handles watching dependent resources accross namespace boundaries
* handles namespaced resource watching cluster resource
* handles namespaced resource watching namespaced resource in another
namespace
* Removing owner references crossing namespace boundaries
* Fixing controller map to watch gvk's across namespaces
* need to handle same GVK with different enqueue behavior
  • Loading branch information
Shawn Hurley authored Apr 2, 2019
1 parent e89a1eb commit f9d902f
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 111 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/ansible/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ func Run(done chan error, mgr manager.Manager, f *flags.AnsibleOperatorFlags, cM
done <- errors.New("failed to add controller")
return
}
cMap.Store(o.GVK, &controllermap.ControllerMapContents{Controller: *ctr,
cMap.Store(o.GVK, &controllermap.Contents{Controller: *ctr,
WatchDependentResources: runner.GetWatchDependentResources(),
WatchClusterScopedResources: runner.GetWatchClusterScopedResources(),
WatchMap: controllermap.NewWatchMap(),
UIDMap: controllermap.NewUIDMap(),
OwnerWatchMap: controllermap.NewWatchMap(),
AnnotationWatchMap: controllermap.NewWatchMap(),
})
}
done <- mgr.Start(c)
Expand Down
52 changes: 8 additions & 44 deletions pkg/ansible/proxy/controllermap/controllermap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,13 @@ import (
"sync"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller"
)

// ControllerMap - map of GVK to ControllerMapContents
type ControllerMap struct {
mutex sync.RWMutex
internal map[schema.GroupVersionKind]*ControllerMapContents
}

// UIDMap - map of UID to namespaced name of owner
type UIDMap struct {
mutex sync.RWMutex
internal map[types.UID]types.NamespacedName
internal map[schema.GroupVersionKind]*Contents
}

// WatchMap - map of GVK to interface. Determines if resource is being watched already
Expand All @@ -40,20 +33,20 @@ type WatchMap struct {
internal map[schema.GroupVersionKind]interface{}
}

// ControllerMapContents- Contains internal data associated with each controller
type ControllerMapContents struct {
// Contents - Contains internal data associated with each controller
type Contents struct {
Controller controller.Controller
WatchDependentResources bool
WatchClusterScopedResources bool
WatchMap *WatchMap
UIDMap *UIDMap
OwnerWatchMap *WatchMap
AnnotationWatchMap *WatchMap
}

// NewControllerMap returns a new object that contains a mapping between GVK
// and ControllerMapContents object
func NewControllerMap() *ControllerMap {
return &ControllerMap{
internal: make(map[schema.GroupVersionKind]*ControllerMapContents),
internal: make(map[schema.GroupVersionKind]*Contents),
}
}

Expand All @@ -65,16 +58,9 @@ func NewWatchMap() *WatchMap {
}
}

// NewUIDMap - returns a new object that maps UID to namespaced name of owner
func NewUIDMap() *UIDMap {
return &UIDMap{
internal: make(map[types.UID]types.NamespacedName),
}
}

// Get - Returns a ControllerMapContents given a GVK as the key. `ok`
// determines if the key exists
func (cm *ControllerMap) Get(key schema.GroupVersionKind) (value *ControllerMapContents, ok bool) {
func (cm *ControllerMap) Get(key schema.GroupVersionKind) (value *Contents, ok bool) {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
value, ok = cm.internal[key]
Expand All @@ -89,7 +75,7 @@ func (cm *ControllerMap) Delete(key schema.GroupVersionKind) {
}

// Store - Adds a new GVK to controller mapping
func (cm *ControllerMap) Store(key schema.GroupVersionKind, value *ControllerMapContents) {
func (cm *ControllerMap) Store(key schema.GroupVersionKind, value *Contents) {
cm.mutex.Lock()
defer cm.mutex.Unlock()
cm.internal[key] = value
Expand All @@ -116,25 +102,3 @@ func (wm *WatchMap) Store(key schema.GroupVersionKind) {
defer wm.mutex.Unlock()
wm.internal[key] = nil
}

// Get - Returns a NamespacedName of the owner given a UID
func (um *UIDMap) Get(key types.UID) (value types.NamespacedName, ok bool) {
um.mutex.RLock()
defer um.mutex.RUnlock()
value, ok = um.internal[key]
return value, ok
}

// Delete - Deletes associated UID to NamespacedName mapping
func (um *UIDMap) Delete(key types.UID) {
um.mutex.Lock()
defer um.mutex.Unlock()
delete(um.internal, key)
}

// Store - Adds a new UID to NamespacedName mapping
func (um *UIDMap) Store(key types.UID, value types.NamespacedName) {
um.mutex.Lock()
defer um.mutex.Unlock()
um.internal[key] = value
}
182 changes: 118 additions & 64 deletions pkg/ansible/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@ import (
"github.com/operator-framework/operator-sdk/pkg/ansible/proxy/controllermap"
"github.com/operator-framework/operator-sdk/pkg/ansible/proxy/kubeconfig"
k8sRequest "github.com/operator-framework/operator-sdk/pkg/ansible/proxy/requestfactory"
osdkHandler "github.com/operator-framework/operator-sdk/pkg/handler"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

Expand Down Expand Up @@ -152,25 +151,8 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper
// Once we get the resource, we are going to attempt to recover the dependent watches here,
// This will happen in the background, and log errors.
if injectOwnerRef {
go func() {
ownerRef, err := getRequestOwnerRef(req)
if err != nil {
log.Error(err, "Could not get ownerRef from proxy")
return
}

for _, oRef := range un.GetOwnerReferences() {
if oRef.APIVersion == ownerRef.APIVersion && oRef.Kind == ownerRef.Kind {
err := addWatchToController(ownerRef, cMap, un, restMapper)
if err != nil {
log.Error(err, "Could not recover dependent resource watch", "owner", ownerRef)
return
}
}
}
}()
go recoverDependentWatches(req, un, cMap, restMapper)
}

}

i := bytes.Buffer{}
Expand Down Expand Up @@ -203,6 +185,38 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper
})
}

func recoverDependentWatches(req *http.Request, un *unstructured.Unstructured, cMap *controllermap.ControllerMap, restMapper meta.RESTMapper) {
ownerRef, err := getRequestOwnerRef(req)
if err != nil {
log.Error(err, "Could not get ownerRef from proxy")
return
}

for _, oRef := range un.GetOwnerReferences() {
if oRef.APIVersion == ownerRef.APIVersion && oRef.Kind == ownerRef.Kind {
err := addWatchToController(ownerRef, cMap, un, restMapper, true)
if err != nil {
log.Error(err, "Could not recover dependent resource watch", "owner", ownerRef)
return
}
}
}
if typeString, ok := un.GetAnnotations()[osdkHandler.TypeAnnotation]; ok {
ownerGV, err := schema.ParseGroupVersion(ownerRef.APIVersion)
if err != nil {
log.Error(err, "Could not get ownerRef from proxy")
return
}
if typeString == fmt.Sprintf("%v.%v", ownerRef.Kind, ownerGV.Group) {
err := addWatchToController(ownerRef, cMap, un, restMapper, false)
if err != nil {
log.Error(err, "Could not recover dependent resource watch", "owner", ownerRef)
return
}
}
}
}

// InjectOwnerReferenceHandler will handle proxied requests and inject the
// owner reference found in the authorization header. The Authorization is
// then deleted so that the proxy can re-set with the correct authorization.
Expand Down Expand Up @@ -248,7 +262,33 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM
http.Error(w, m, http.StatusBadRequest)
return
}
data.SetOwnerReferences(append(data.GetOwnerReferences(), owner.OwnerReference))

addOwnerRef, err := shouldAddOwnerRef(data, owner, restMapper)
if err != nil {
m := "Could not determine if we should add owner ref"
log.Error(err, m)
http.Error(w, m, http.StatusBadRequest)
return
}
if addOwnerRef {
data.SetOwnerReferences(append(data.GetOwnerReferences(), owner.OwnerReference))
} else {
ownerGV, err := schema.ParseGroupVersion(owner.APIVersion)
if err != nil {
m := fmt.Sprintf("could not get broup version for: %v", owner)
log.Error(err, m)
http.Error(w, m, http.StatusBadRequest)
return
}
a := data.GetAnnotations()
if a == nil {
a = map[string]string{}
}
a[osdkHandler.NamespacedNameAnnotation] = strings.Join([]string{owner.Namespace, owner.Name}, "/")
a[osdkHandler.TypeAnnotation] = fmt.Sprintf("%v.%v", owner.Kind, ownerGV.Group)

data.SetAnnotations(a)
}
newBody, err := json.Marshal(data.Object)
if err != nil {
m := "Could not serialize body"
Expand All @@ -269,7 +309,7 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM
_, allNsPresent := watchedNamespaces[metav1.NamespaceAll]
_, reqNsPresent := watchedNamespaces[r.Namespace]
if allNsPresent || reqNsPresent {
err = addWatchToController(owner, cMap, data, restMapper)
err = addWatchToController(owner, cMap, data, restMapper, addOwnerRef)
if err != nil {
m := "could not add watch to controller"
log.Error(err, m)
Expand All @@ -289,6 +329,39 @@ func removeAuthorizationHeader(h http.Handler) http.Handler {
})
}

func shouldAddOwnerRef(data *unstructured.Unstructured, owner kubeconfig.NamespacedOwnerReference, restMapper meta.RESTMapper) (bool, error) {
dataMapping, err := restMapper.RESTMapping(data.GroupVersionKind().GroupKind(), data.GroupVersionKind().Version)
if err != nil {
m := fmt.Sprintf("Could not get rest mapping for: %v", data.GroupVersionKind())
log.Error(err, m)
return false, err

}
// We need to determine whether or not the owner is a cluster-scoped
// resource because enqueue based on an owner reference does not work if
// a namespaced resource owns a cluster-scoped resource
ownerGV, err := schema.ParseGroupVersion(owner.APIVersion)
if err != nil {
m := fmt.Sprintf("could not get group version for: %v", owner)
log.Error(err, m)
return false, err
}
ownerMapping, err := restMapper.RESTMapping(schema.GroupKind{Kind: owner.Kind, Group: ownerGV.Group}, ownerGV.Version)
if err != nil {
m := fmt.Sprintf("could not get rest mapping for: %v", owner)
log.Error(err, m)
return false, err
}

dataNamespaceScoped := dataMapping.Scope.Name() != meta.RESTScopeNameRoot
ownerNamespaceScoped := ownerMapping.Scope.Name() != meta.RESTScopeNameRoot

if dataNamespaceScoped && ownerNamespaceScoped && data.GetNamespace() == owner.Namespace {
return true, nil
}
return false, nil
}

// RequestLogHandler - log the requests that come through the proxy.
func RequestLogHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -399,17 +472,14 @@ func Run(done chan error, o Options) error {
return nil
}

func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *controllermap.ControllerMap, resource *unstructured.Unstructured, restMapper meta.RESTMapper) error {
func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *controllermap.ControllerMap, resource *unstructured.Unstructured, restMapper meta.RESTMapper, useOwnerRef bool) error {
dataMapping, err := restMapper.RESTMapping(resource.GroupVersionKind().GroupKind(), resource.GroupVersionKind().Version)
if err != nil {
m := fmt.Sprintf("Could not get rest mapping for: %v", resource.GroupVersionKind())
log.Error(err, m)
return err

}
// We need to determine whether or not the owner is a cluster-scoped
// resource because enqueue based on an owner reference does not work if
// a namespaced resource owns a cluster-scoped resource
ownerGV, err := schema.ParseGroupVersion(owner.APIVersion)
if err != nil {
m := fmt.Sprintf("could not get broup version for: %v", owner)
Expand All @@ -424,59 +494,43 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr
}

dataNamespaceScoped := dataMapping.Scope.Name() != meta.RESTScopeNameRoot
ownerNamespaceScoped := ownerMapping.Scope.Name() != meta.RESTScopeNameRoot
useOwnerReference := !ownerNamespaceScoped || dataNamespaceScoped
contents, ok := cMap.Get(ownerMapping.GroupVersionKind)
if !ok {
return errors.New("failed to find controller in map")
}
wMap := contents.WatchMap
uMap := contents.UIDMap
owMap := contents.OwnerWatchMap
awMap := contents.AnnotationWatchMap
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(ownerMapping.GroupVersionKind)
// Add a watch to controller
if contents.WatchDependentResources {
// Store UID
uMap.Store(owner.UID, types.NamespacedName{
Name: owner.Name,
Namespace: owner.Namespace,
})
_, exists := wMap.Get(resource.GroupVersionKind())
// If already watching resource no need to add a new watch
if exists {
return nil
}
// Store watch in map
wMap.Store(resource.GroupVersionKind())
// Use EnqueueRequestForOwner unless user has configured watching cluster scoped resources and we have to
if useOwnerReference {
switch {
case useOwnerRef:
_, exists := owMap.Get(resource.GroupVersionKind())
// If already watching resource no need to add a new watch
if exists {
return nil
}

owMap.Store(resource.GroupVersionKind())
log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind())
// Store watch in map
err = contents.Controller.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u})
err := contents.Controller.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u})
if err != nil {
return err
}
} else if contents.WatchClusterScopedResources {
log.Info("Watching child resource which can be cluster-scoped", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind())
// Add watch
err = contents.Controller.Watch(
&source.Kind{Type: resource},
// Use Map func since EnqueuRequestForOwner won't work
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
log.V(2).Info("Creating reconcile request from object", "gvk", ownerMapping.GroupVersionKind, "name", a.Meta.GetName())
ownRefs := a.Meta.GetOwnerReferences()
for _, ref := range ownRefs {
nn, exists := uMap.Get(ref.UID)
if !exists {
continue
}
return []reconcile.Request{
{NamespacedName: nn},
}
}
return nil
})},
)
case (!useOwnerRef && dataNamespaceScoped) || contents.WatchClusterScopedResources:
_, exists := awMap.Get(resource.GroupVersionKind())
// If already watching resource no need to add a new watch
if exists {
return nil
}
awMap.Store(resource.GroupVersionKind())
typeString := fmt.Sprintf("%v.%v", owner.Kind, ownerGV.Group)
log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_annotation_type", typeString)
err = contents.Controller.Watch(&source.Kind{Type: resource}, &osdkHandler.EnqueueRequestForAnnotation{Type: typeString})
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit f9d902f

Please sign in to comment.