Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/ansible/proxy: adding ability for dependent watches to be recovered #1067

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 116 additions & 86 deletions pkg/ansible/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type marshaler interface {
// resource exists in our cache. If it does then there is no need to bombard
// the APIserver with our request and we should write the response from the
// proxy.
func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}) http.Handler {
func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}, cMap *controllermap.ControllerMap, injectOwnerRef bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
Expand Down Expand Up @@ -138,17 +138,39 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper
}
m = &un
} else {
un := unstructured.Unstructured{}
un := &unstructured.Unstructured{}
un.SetGroupVersionKind(k)
obj := client.ObjectKey{Namespace: r.Namespace, Name: r.Name}
err = informerCache.Get(context.Background(), obj, &un)
err = informerCache.Get(context.Background(), obj, un)
if err != nil {
// break here in case resource doesn't exist in cache but exists on APIserver
// This is very unlikely but provides user with expected 404
log.Info(fmt.Sprintf("Cache miss: %v, %v", k, obj))
break
}
m = &un
m = un
// Once we get the resource, we are going to attempt to recover the dependent watches here,
// This will happen in the background, and log errors.
if injectOwnerRef {
go func() {
ownerRef, err := getRequestOwnerRef(req)
if err != nil {
log.Error(err, "Could not get ownerRef from proxy")
return
}

for _, oRef := range un.GetOwnerReferences() {
if oRef.APIVersion == ownerRef.APIVersion && oRef.Kind == ownerRef.Kind {
err := addWatchToController(ownerRef, cMap, un, restMapper)
if err != nil {
log.Error(err, "Could not recover dependent resource watch", "owner", ownerRef)
return
}
}
}
}()
}

}

i := bytes.Buffer{}
Expand Down Expand Up @@ -184,7 +206,7 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper
// InjectOwnerReferenceHandler will handle proxied requests and inject the
// owner refernece found in the authorization header. The Authorization is
// then deleted so that the proxy can re-set with the correct authorization.
func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerMap, restMapper meta.RESTMapper) http.Handler {
func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerMap, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodPost:
Expand All @@ -203,29 +225,9 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM
break
}
log.Info("Injecting owner reference")

user, _, ok := req.BasicAuth()
if !ok {
log.Error(errors.New("basic auth header not found"), "")
w.Header().Set("WWW-Authenticate", "Basic realm=\"Operator Proxy\"")
http.Error(w, "", http.StatusUnauthorized)
return
}
authString, err := base64.StdEncoding.DecodeString(user)
owner, err := getRequestOwnerRef(req)
if err != nil {
m := "Could not base64 decode username"
log.Error(err, m)
http.Error(w, m, http.StatusBadRequest)
return
}
// Set owner to NamespacedOwnerReference, which has metav1.OwnerReference
// as a subset along with the Namespace of the owner. Please see the
// kubeconfig.NamespacedOwnerReference type for more information. The
// namespace is required when creating the reconcile requests.
owner := kubeconfig.NamespacedOwnerReference{}
json.Unmarshal(authString, &owner)
if err := json.Unmarshal(authString, &owner); err != nil {
m := "Could not unmarshal auth string"
m := "Could not get owner reference"
log.Error(err, m)
http.Error(w, m, http.StatusInternalServerError)
return
Expand Down Expand Up @@ -257,35 +259,23 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM
log.V(1).Info("Serialized body", "Body", string(newBody))
req.Body = ioutil.NopCloser(bytes.NewBuffer(newBody))
req.ContentLength = int64(len(newBody))
dataMapping, err := restMapper.RESTMapping(data.GroupVersionKind().GroupKind(), data.GroupVersionKind().Version)
if err != nil {
m := fmt.Sprintf("Could not get rest mapping for: %v", data.GroupVersionKind())
log.Error(err, m)
http.Error(w, m, http.StatusInternalServerError)
return
}
// We need to determine whether or not the owner is a cluster-scoped
// resource because enqueue based on an owner reference does not work if
// a namespaced resource owns a cluster-scoped resource
ownerGV, err := schema.ParseGroupVersion(owner.APIVersion)
ownerMapping, err := restMapper.RESTMapping(schema.GroupKind{Kind: owner.Kind, Group: ownerGV.Group}, ownerGV.Version)
if err != nil {
m := fmt.Sprintf("could not get rest mapping for: %v", owner)
log.Error(err, m)
http.Error(w, m, http.StatusInternalServerError)
return
}

dataNamespaceScoped := dataMapping.Scope.Name() != meta.RESTScopeNameRoot
ownerNamespaceScoped := ownerMapping.Scope.Name() != meta.RESTScopeNameRoot
useOwnerReference := !ownerNamespaceScoped || dataNamespaceScoped
// add watch for resource
err = addWatchToController(owner, cMap, data, useOwnerReference)
if err != nil {
m := "could not add watch to controller"
log.Error(err, m)
http.Error(w, m, http.StatusInternalServerError)
return
// check if resource doesn't exist in watched namespaces
// if watchedNamespaces[""] exists then we are watching all namespaces
// and want to continue
// This is making sure we are not attempting to watch a resource outside of the
// namespaces that the cache can watch.
_, allNsPresent := watchedNamespaces[metav1.NamespaceAll]
_, reqNsPresent := watchedNamespaces[r.Namespace]
if allNsPresent || reqNsPresent {
err = addWatchToController(owner, cMap, data, restMapper)
if err != nil {
m := "could not add watch to controller"
log.Error(err, m)
http.Error(w, m, http.StatusInternalServerError)
return
}
}
}
// Removing the authorization so that the proxy can set the correct authorization.
Expand All @@ -294,6 +284,7 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM
})
}

// RequestLogHandler - log the requests that come through the proxy.
func RequestLogHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// read body
Expand Down Expand Up @@ -379,13 +370,13 @@ func Run(done chan error, o Options) error {
}

if !o.NoOwnerInjection {
server.Handler = InjectOwnerReferenceHandler(server.Handler, o.ControllerMap, o.RESTMapper)
server.Handler = InjectOwnerReferenceHandler(server.Handler, o.ControllerMap, o.RESTMapper, watchedNamespaceMap)
}
if o.LogRequests {
server.Handler = RequestLogHandler(server.Handler)
}
if !o.DisableCache {
server.Handler = CacheResponseHandler(server.Handler, o.Cache, o.RESTMapper, watchedNamespaceMap)
server.Handler = CacheResponseHandler(server.Handler, o.Cache, o.RESTMapper, watchedNamespaceMap, o.ControllerMap, !o.NoOwnerInjection)
}

l, err := server.Listen(o.Address, o.Port)
Expand All @@ -399,57 +390,71 @@ func Run(done chan error, o Options) error {
return nil
}

func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *controllermap.ControllerMap, resource *unstructured.Unstructured, useOwnerReference bool) error {
gv, err := schema.ParseGroupVersion(owner.APIVersion)
func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *controllermap.ControllerMap, resource *unstructured.Unstructured, restMapper meta.RESTMapper) error {
dataMapping, err := restMapper.RESTMapping(resource.GroupVersionKind().GroupKind(), resource.GroupVersionKind().Version)
if err != nil {
m := fmt.Sprintf("Could not get rest mapping for: %v", resource.GroupVersionKind())
log.Error(err, m)
return err

}
// We need to determine whether or not the owner is a cluster-scoped
// resource because enqueue based on an owner reference does not work if
// a namespaced resource owns a cluster-scoped resource
ownerGV, err := schema.ParseGroupVersion(owner.APIVersion)
if err != nil {
m := fmt.Sprintf("could not get broup version for: %v", owner)
log.Error(err, m)
return err
}
gvk := schema.GroupVersionKind{
Group: gv.Group,
Version: gv.Version,
Kind: owner.Kind,
ownerMapping, err := restMapper.RESTMapping(schema.GroupKind{Kind: owner.Kind, Group: ownerGV.Group}, ownerGV.Version)
if err != nil {
m := fmt.Sprintf("could not get rest mapping for: %v", owner)
log.Error(err, m)
return err
}
contents, ok := cMap.Get(gvk)

dataNamespaceScoped := dataMapping.Scope.Name() != meta.RESTScopeNameRoot
ownerNamespaceScoped := ownerMapping.Scope.Name() != meta.RESTScopeNameRoot
useOwnerReference := !ownerNamespaceScoped || dataNamespaceScoped
contents, ok := cMap.Get(ownerMapping.GroupVersionKind)
if !ok {
return errors.New("failed to find controller in map")
}
wMap := contents.WatchMap
uMap := contents.UIDMap
// Store UID
uMap.Store(owner.UID, types.NamespacedName{
Name: owner.Name,
Namespace: owner.Namespace,
})
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(gvk)
u.SetGroupVersionKind(ownerMapping.GroupVersionKind)
// Add a watch to controller
if contents.WatchDependentResources {
// Use EnqueueRequestForOwner unless user has configured watching cluster scoped resources
if useOwnerReference && !contents.WatchClusterScopedResources {
_, exists := wMap.Get(resource.GroupVersionKind())
// If already watching resource no need to add a new watch
if exists {
return nil
}
// Store UID
uMap.Store(owner.UID, types.NamespacedName{
Name: owner.Name,
Namespace: owner.Namespace,
})
_, exists := wMap.Get(resource.GroupVersionKind())
// If already watching resource no need to add a new watch
if exists {
return nil
}
// Store watch in map
wMap.Store(resource.GroupVersionKind())
// Use EnqueueRequestForOwner unless user has configured watching cluster scoped resources and we have to
if useOwnerReference {
log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind())
// Store watch in map
wMap.Store(resource.GroupVersionKind())
err = contents.Controller.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u})
} else if contents.WatchClusterScopedResources {
// Use Map func since EnqueuRequestForOwner won't work
// Check if resource is already watched
_, exists := wMap.Get(resource.GroupVersionKind())
if exists {
return nil
if err != nil {
return err
}
} else if contents.WatchClusterScopedResources {
log.Info("Watching child resource which can be cluster-scoped", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind())
// Store watch in map
wMap.Store(resource.GroupVersionKind())
// Add watch
err = contents.Controller.Watch(
&source.Kind{Type: resource},
// Use Map func since EnqueuRequestForOwner won't work
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
log.V(2).Info("Creating reconcile request from object", "gvk", gvk, "name", a.Meta.GetName())
log.V(2).Info("Creating reconcile request from object", "gvk", ownerMapping.GroupVersionKind, "name", a.Meta.GetName())
ownRefs := a.Meta.GetOwnerReferences()
for _, ref := range ownRefs {
nn, exists := uMap.Get(ref.UID)
Expand All @@ -470,3 +475,28 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr
}
return nil
}

func getRequestOwnerRef(req *http.Request) (kubeconfig.NamespacedOwnerReference, error) {
owner := kubeconfig.NamespacedOwnerReference{}
user, _, ok := req.BasicAuth()
if !ok {
return owner, errors.New("basic auth header not found")
}
authString, err := base64.StdEncoding.DecodeString(user)
if err != nil {
m := "Could not base64 decode username"
log.Error(err, m)
return owner, err
}
// Set owner to NamespacedOwnerReference, which has metav1.OwnerReference
// as a subset along with the Namespace of the owner. Please see the
// kubeconfig.NamespacedOwnerReference type for more information. The
// namespace is required when creating the reconcile requests.
json.Unmarshal(authString, &owner)
if err := json.Unmarshal(authString, &owner); err != nil {
m := "Could not unmarshal auth string"
log.Error(err, m)
return owner, err
}
return owner, err
}
53 changes: 53 additions & 0 deletions test/ansible-memcached/asserts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,59 @@
assert:
that: cr.resources[0].status.get("test") == "hello world"
when: cr is defined
- when: molecule_yml.scenario.name == "test-local"
block:
- name: Restart the operator by killing the pod
k8s:
state: absent
definition:
api_version: v1
kind: Pod
metadata:
namespace: '{{ namespace }}'
name: '{{ pod.metadata.name }}'
vars:
pod: '{{ q("k8s", api_version="v1", kind="Pod", namespace=namespace, label_selector="name=memcached-operator").0 }}'

- name: Wait 2 minutes for operator deployment
debug:
var: deploy
until: deploy and deploy.status and deploy.status.replicas == deploy.status.get("availableReplicas", 0)
retries: 12
delay: 10
vars:
deploy: '{{ lookup("k8s",
kind="Deployment",
api_version="apps/v1",
namespace=namespace,
resource_name="memcached-operator"
)}}'

- name: Wait for reconcilation to have a chance at finishing
pause:
seconds: 15

- name: Delete the service that is created.
k8s:
kind: Service
api_version: v1
namespace: '{{ namespace }}'
name: test-service
state: absent

- name: Verify that test-service was re-created
debug:
var: service
until: service
retries: 12
delay: 10
vars:
service: '{{ lookup("k8s",
kind="Service",
api_version="v1",
namespace=namespace,
resource_name="test-service",
)}}'

- name: Delete the custom resource
k8s:
Expand Down