Skip to content

Commit

Permalink
Change where transformers are called.
Browse files Browse the repository at this point in the history
Kubernetes-commit: e76dff38cf74c3c8ad9ed4d3bc6e3641d9b64565
  • Loading branch information
lavalamp authored and k8s-publishing-bot committed Mar 14, 2023
1 parent 308e6b1 commit c3b84f0
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 30 deletions.
22 changes: 2 additions & 20 deletions tools/cache/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,17 +394,6 @@ func NewIndexerInformer(
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
}

// TransformFunc allows for transforming an object before it will be processed
// and put into the controller cache and before the corresponding handlers will
// be called on it.
// TransformFunc (similarly to ResourceEventHandler functions) should be able
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown
//
// The most common usage pattern is to clean-up some parts of the object to
// reduce component memory usage if a given component doesn't care about them.
// given controller doesn't care for them
type TransformFunc func(interface{}) (interface{}, error)

// NewTransformingInformer returns a Store and a controller for populating
// the store while also providing event notifications. You should only used
// the returned Store for Get/List operations; Add/Modify/Deletes will cause
Expand Down Expand Up @@ -452,20 +441,12 @@ func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
isInInitialList bool,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}

switch d.Type {
case Sync, Replaced, Added, Updated:
Expand Down Expand Up @@ -517,6 +498,7 @@ func newInformer(
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transformer,
})

cfg := &Config{
Expand All @@ -528,7 +510,7 @@ func newInformer(

Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas, isInInitialList)
return processDeltas(h, clientState, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
},
Expand Down
4 changes: 2 additions & 2 deletions tools/cache/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"testing"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -32,7 +32,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
fcache "k8s.io/client-go/tools/cache/testing"

"github.com/google/gofuzz"
fuzz "github.com/google/gofuzz"
)

func Example() {
Expand Down
44 changes: 44 additions & 0 deletions tools/cache/delta_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type DeltaFIFOOptions struct {
// When true, `Replaced` events will be sent for items passed to a Replace() call.
// When false, `Sync` events will be sent instead.
EmitDeltaTypeReplaced bool

// If set, will be called for objects before enqueueing them. Please
// see the comment on TransformFunc for details.
Transformer TransformFunc
}

// DeltaFIFO is like FIFO, but differs in two ways. One is that the
Expand Down Expand Up @@ -129,8 +133,32 @@ type DeltaFIFO struct {
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool

// Called with every object if non-nil.
transformer TransformFunc
}

// TransformFunc allows for transforming an object before it will be processed.
// TransformFunc (similarly to ResourceEventHandler functions) should be able
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
//
// New in v1.27: In such cases, the contained object will already have gone
// through the transform object separately (when it was added / updated prior
// to the delete), so the TransformFunc can likely safely ignore such objects
// (i.e., just return the input object).
//
// The most common usage pattern is to clean-up some parts of the object to
// reduce component memory usage if a given component doesn't care about them.
//
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
// sees the object before any other actor, and it is now safe to mutate the
// object in place instead of making a copy.
//
// Note that TransformFunc is called while inserting objects into the
// notification queue and is therefore extremely performance sensitive; please
// do not do anything that will take a long time.
type TransformFunc func(interface{}) (interface{}, error)

// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string

Expand Down Expand Up @@ -227,6 +255,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
knownObjects: opts.KnownObjects,

emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
transformer: opts.Transformer,
}
f.cond.L = &f.lock
return f
Expand Down Expand Up @@ -415,6 +444,21 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
if err != nil {
return KeyError{obj, err}
}

// Every object comes through this code path once, so this is a good
// place to call the transform func. If obj is a
// DeletedFinalStateUnknown tombstone, then the containted inner object
// will already have gone through the transformer, but we document that
// this can happen. In cases involving Replace(), such an object can
// come through multiple times.
if f.transformer != nil {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
}
}

oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
Expand Down
82 changes: 82 additions & 0 deletions tools/cache/delta_fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,88 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
}
}

type rvAndXfrm struct {
rv int
xfrm int
}

func TestDeltaFIFO_transformer(t *testing.T) {
mk := func(name string, rv int) testFifoObject {
return mkFifoObj(name, &rvAndXfrm{rv, 0})
}
xfrm := TransformFunc(func(obj interface{}) (interface{}, error) {
switch v := obj.(type) {
case testFifoObject:
v.val.(*rvAndXfrm).xfrm++
case DeletedFinalStateUnknown:
if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 {
return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj)
}
default:
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
return obj, nil
})

must := func(err error) {
if err != nil {
t.Fatal(err)
}
}

f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
Transformer: xfrm,
})
must(f.Add(mk("foo", 10)))
must(f.Add(mk("bar", 11)))
must(f.Update(mk("foo", 12)))
must(f.Delete(mk("foo", 15)))
must(f.Replace([]interface{}{}, ""))
must(f.Add(mk("bar", 16)))
must(f.Replace([]interface{}{}, ""))

// Should be empty
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}

for i := 0; i < 2; i++ {
obj, err := f.Pop(func(o interface{}, isInInitialList bool) error { return nil })
if err != nil {
t.Fatalf("got nothing on try %v?", i)
}
obj = obj.(Deltas).Newest().Object
switch v := obj.(type) {
case testFifoObject:
if v.name != "foo" {
t.Errorf("expected regular deletion of foo, got %q", v.name)
}
rx := v.val.(*rvAndXfrm)
if rx.rv != 15 {
t.Errorf("expected last message, got %#v", obj)
}
if rx.xfrm != 1 {
t.Errorf("obj %v transformed wrong number of times.", obj)
}
case DeletedFinalStateUnknown:
tf := v.Obj.(testFifoObject)
rx := tf.val.(*rvAndXfrm)
if tf.name != "bar" {
t.Errorf("expected tombstone deletion of bar, got %q", tf.name)
}
if rx.rv != 16 {
t.Errorf("expected last message, got %#v", obj)
}
if rx.xfrm != 1 {
t.Errorf("tombstoned obj %v transformed wrong number of times.", obj)
}
default:
t.Errorf("unknown item %#v", obj)
}
}
}

func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
f.Add(mkFifoObj("foo", 10))
Expand Down
8 changes: 3 additions & 5 deletions tools/cache/shared_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,7 @@ type SharedInformer interface {
//
// Must be set before starting the informer.
//
// Note: Since the object given to the handler may be already shared with
// other goroutines, it is advisable to copy the object being
// transform before mutating it at all and returning the copy to prevent
// data races.
// Please see the comment on TransformFunc for more details.
SetTransform(handler TransformFunc) error

// IsStopped reports whether the informer has already been stopped.
Expand Down Expand Up @@ -465,6 +462,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})

cfg := &Config{
Expand Down Expand Up @@ -637,7 +635,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool
defer s.blockDeltas.Unlock()

if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas, isInInitialList)
return processDeltas(s, s.indexer, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
}
Expand Down
5 changes: 2 additions & 3 deletions tools/cache/shared_informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,8 @@ func TestSharedInformerTransformer(t *testing.T) {
name := pod.GetName()

if upper := strings.ToUpper(name); upper != name {
copied := pod.DeepCopyObject().(*v1.Pod)
copied.SetName(upper)
return copied, nil
pod.SetName(upper)
return pod, nil
}
}
return obj, nil
Expand Down

0 comments on commit c3b84f0

Please sign in to comment.