Skip to content

Commit

Permalink
Merge pull request #2783 from alvaroaleman/compatible-generics
Browse files Browse the repository at this point in the history
⚠️ Source, Event, Predicate, Handler: Add generics support
  • Loading branch information
k8s-ci-robot authored Apr 22, 2024
2 parents a92b961 + 2add01e commit ae0f6ab
Show file tree
Hide file tree
Showing 24 changed files with 773 additions and 614 deletions.
6 changes: 3 additions & 3 deletions examples/builtins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.TypedEnqueueRequestForObject[*appsv1.ReplicaSet]{})); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{},
handler.TypedEnqueueRequestForOwner[*corev1.Pod](mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
60 changes: 34 additions & 26 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -56,6 +55,7 @@ const (
type Builder struct {
forInput ForInput
ownsInput []OwnsInput
rawSources []source.Source
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
Expand Down Expand Up @@ -123,8 +123,8 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
eventHandler handler.EventHandler
obj client.Object
handler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}
Expand All @@ -133,10 +133,19 @@ type WatchesInput struct {
// update events by *reconciling the object* with the given EventHandler.
//
// This is the equivalent of calling
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
// WatchesRawSource(source.Kind(cache, object, eventHandler, predicates...)).
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
src := source.Kind(blder.mgr.GetCache(), object)
return blder.WatchesRawSource(src, eventHandler, opts...)
input := WatchesInput{
obj: object,
handler: eventHandler,
}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}

blder.watchesInput = append(blder.watchesInput, input)

return blder
}

// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
Expand Down Expand Up @@ -176,13 +185,11 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
//
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src, eventHandler: eventHandler}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}
//
// WatchesRawSource does not respect predicates configured through WithEventFilter.
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
blder.rawSources = append(blder.rawSources, src)

blder.watchesInput = append(blder.watchesInput, input)
return blder
}

Expand Down Expand Up @@ -272,11 +279,11 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}
Expand All @@ -290,7 +297,6 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
opts := []handler.OwnerOption{}
if !own.matchEveryOwner {
opts = append(opts, handler.OnlyControllerOwner())
Expand All @@ -302,27 +308,29 @@ func (blder *Builder) doWatch() error {
)
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}

// Do the watch requests
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")
}
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
if srcKind, ok := w.src.(*internalsource.Kind); ok {
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
}
srcKind.Type = typeForSrc
projected, err := blder.project(w.obj, w.objectProjection)
if err != nil {
return fmt.Errorf("failed to project for %T: %w", w.obj, err)
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil {
if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil {
return err
}
}
for _, src := range blder.rawSources {
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ var _ = Describe("application", func() {
instance, err := ControllerManagedBy(m).
Named("my_controller").
Build(noop)
Expect(err).To(MatchError(ContainSubstring("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")))
Expect(err).To(MatchError(ContainSubstring("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")))
Expect(instance).To(BeNil())
})

Expand Down
11 changes: 2 additions & 9 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -84,13 +82,8 @@ type Controller interface {
// Reconciler is called to reconcile an object by Namespace/Name
reconcile.Reconciler

// Watch takes events provided by a Source and uses the EventHandler to
// enqueue reconcile.Requests in response to the events.
//
// Watch may be provided one or more Predicates to filter events before
// they are given to the EventHandler. Events will be passed to the
// EventHandler if all provided Predicates evaluate to true.
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
// Watch watches the provided Source.
Watch(src source.Source) error

// Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting.
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ var _ = Describe("controller", func() {

By("Watching Resources")
err = instance.Watch(
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}),
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{},
handler.TypedEnqueueRequestForOwner[*appsv1.ReplicaSet](cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
),
)
Expect(err).NotTo(HaveOccurred())

err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{})
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}))
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() {

ctx, cancel := context.WithCancel(context.Background())
watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan}
watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{})
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
Expand All @@ -101,7 +101,7 @@ var _ = Describe("controller.Controller", func() {
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
Expect(c.Watch(watch)).To(Succeed())
Expect(err).NotTo(HaveOccurred())

go func() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func ExampleController() {
}

// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{})
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -108,7 +108,7 @@ func ExampleController_unstructured() {
Version: "v1",
})
// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), u), &handler.EnqueueRequestForObject{})
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.TypedEnqueueRequestForObject[*unstructured.Unstructured]{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
os.Exit(1)
}

if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{})); err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
}
Expand Down
51 changes: 34 additions & 17 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,55 @@ package event

import "sigs.k8s.io/controller-runtime/pkg/client"

// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by a handler.EventHandler.
type CreateEvent = TypedCreateEvent[client.Object]

// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type UpdateEvent = TypedUpdateEvent[client.Object]

// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type CreateEvent struct {
type DeleteEvent = TypedDeleteEvent[client.Object]

// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.EventHandler.
type GenericEvent = TypedGenericEvent[client.Object]

// TypedCreateEvent is an event where a Kubernetes object was created. TypedCreateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedCreateEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T
}

// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type UpdateEvent struct {
// TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedUpdateEvent[T any] struct {
// ObjectOld is the object from the event
ObjectOld client.Object
ObjectOld T

// ObjectNew is the object from the event
ObjectNew client.Object
ObjectNew T
}

// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type DeleteEvent struct {
// TypedDeleteEvent is an event where a Kubernetes object was deleted. TypedDeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedDeleteEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T

// DeleteStateUnknown is true if the Delete event was missed but we identified the object
// as having been deleted.
DeleteStateUnknown bool
}

// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.EventHandler.
type GenericEvent struct {
// TypedGenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// TypedGenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.TypedEventHandler.
type TypedGenericEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T
}
Loading

0 comments on commit ae0f6ab

Please sign in to comment.