Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/resource: consistent applicators #491

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
dario.cat/mergo v1.0.0
github.com/bufbuild/buf v1.26.1
github.com/evanphx/json-patch v5.6.0+incompatible
github.com/go-logr/logr v1.2.4
github.com/google/go-cmp v0.5.9
github.com/spf13/afero v1.9.5
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
Expand Down
2 changes: 1 addition & 1 deletion pkg/connection/store/kubernetes/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewSecretStore(ctx context.Context, local client.Client, _ *tls.Config, cfg
return &SecretStore{
client: resource.ClientApplicator{
Client: kube,
Applicator: resource.NewApplicatorWithRetry(resource.NewAPIPatchingApplicator(kube), resource.IsAPIErrorWrapped, nil),
Applicator: resource.NewApplicatorWithRetry(resource.NewAPIPatchingApplicator(kube), resource.IsNonConflictAPIErrorWrapped, nil),
},
defaultNamespace: cfg.DefaultScope,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/managed/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewAPISecretPublisher(c client.Client, ot runtime.ObjectTyper) *APISecretPu
// backward compatibility with the original API of this function.
return &APISecretPublisher{
secret: resource.NewApplicatorWithRetry(resource.NewAPIPatchingApplicator(c),
resource.IsAPIErrorWrapped, nil),
resource.IsNonConflictAPIErrorWrapped, nil),
typer: ot,
}
}
Expand Down
140 changes: 99 additions & 41 deletions pkg/resource/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ limitations under the License.
package resource

import (
"bytes"
"context"
"encoding/json"

jsonpatch "github.com/evanphx/json-patch"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -33,6 +35,12 @@ import (
// Error strings.
const (
errUpdateObject = "cannot update object"

// taken from k8s.io/apiserver. Not crucial to match, but for uniformity it
// better should.
// TODO(sttts): import from k8s.io/apiserver/pkg/registry/generic/registry when
// kube has updated otel dependencies post-1.28.
errOptimisticLock = "the object has been modified; please apply your changes to the latest version and try again"
)

// An APIPatchingApplicator applies changes to an object by either creating or
Expand All @@ -50,41 +58,97 @@ func NewAPIPatchingApplicator(c client.Client) *APIPatchingApplicator {
// Apply changes to the supplied object. The object will be created if it does
// not exist, or patched if it does. If the object does exist, it will only be
// patched if the passed object has the same or an empty resource version.
func (a *APIPatchingApplicator) Apply(ctx context.Context, o client.Object, ao ...ApplyOption) error {
m, ok := o.(metav1.Object)
if !ok {
return errors.New("cannot access object metadata")
}

if m.GetName() == "" && m.GetGenerateName() != "" {
return errors.Wrap(a.client.Create(ctx, o), "cannot create object")
func (a *APIPatchingApplicator) Apply(ctx context.Context, obj client.Object, ao ...ApplyOption) error {
if obj.GetName() == "" && obj.GetGenerateName() != "" {
return a.client.Create(ctx, obj)
}

desired := o.DeepCopyObject()

err := a.client.Get(ctx, types.NamespacedName{Name: m.GetName(), Namespace: m.GetNamespace()}, o)
current := obj.DeepCopyObject().(client.Object)
err := a.client.Get(ctx, types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()}, current)
if kerrors.IsNotFound(err) {
// TODO(negz): Apply ApplyOptions here too?
pedjak marked this conversation as resolved.
Show resolved Hide resolved
return errors.Wrap(a.client.Create(ctx, o), "cannot create object")
return a.client.Create(ctx, obj)
}
if err != nil {
return errors.Wrap(err, "cannot get object")
return err
}

for _, fn := range ao {
if err := fn(ctx, o, desired); err != nil {
// Note: this check would ideally not be necessary if the Apply signature
// had a current object that we could use for the diff. But we have no
// current and for consistency of the patch it matters that the object we
// get above is the one that was originally used.
if obj.GetResourceVersion() != "" && obj.GetResourceVersion() != current.GetResourceVersion() {
gvr, err := groupResource(a.client, obj)
if err != nil {
return err
}
return kerrors.NewConflict(gvr, current.GetName(), errors.New(errOptimisticLock))
}

for _, fn := range ao {
if err := fn(ctx, current, obj); err != nil {
return errors.Wrapf(err, "apply option failed for %s", HumanReadableReference(a.client, obj))
}
}

// TODO(negz): Allow callers to override the kind of patch used.
return errors.Wrap(a.client.Patch(ctx, o, &patch{desired}), "cannot patch object")
return a.client.Patch(ctx, obj, client.MergeFromWithOptions(current, client.MergeFromWithOptimisticLock{}))
}

type patch struct{ from runtime.Object }
func groupResource(c client.Client, o client.Object) (schema.GroupResource, error) {
gvk, err := c.GroupVersionKindFor(o)
if err != nil {
return schema.GroupResource{}, errors.Wrapf(err, "cannot determine group version kind of %T", o)
}
m, err := c.RESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return schema.GroupResource{}, errors.Wrapf(err, "cannot determine group resource of %v", gvk)
}
return m.Resource.GroupResource(), nil
}

func (p *patch) Type() types.PatchType { return types.MergePatchType }
func (p *patch) Data(_ client.Object) ([]byte, error) { return json.Marshal(p.from) }
var emptyScheme = runtime.NewScheme() // no need to recognize any types
var jsonSerializer = json.NewSerializerWithOptions(json.DefaultMetaFactory, emptyScheme, emptyScheme, json.SerializerOptions{})

// AdditiveMergePatchApplyOption returns an ApplyOption that makes
// the Apply additive in the sense of a merge patch without null values. This is
// the old behavior of the APIPatchingApplicator.
//
// This only works with a desired object of type *unstructured.Unstructured.
//
// Deprecated: replace with Server Side Apply.
func AdditiveMergePatchApplyOption(_ context.Context, current, desired runtime.Object) error {
// set GVK uniformly to the desired object to make serializer happy
currentGVK, desiredGVK := current.GetObjectKind().GroupVersionKind(), desired.GetObjectKind().GroupVersionKind()
if !desiredGVK.Empty() && currentGVK != desiredGVK {
return errors.Errorf("cannot apply %v to %v", desired.GetObjectKind().GroupVersionKind(), current.GetObjectKind().GroupVersionKind())
}
desired.GetObjectKind().SetGroupVersionKind(currentGVK)

// merge `desired` additively with `current`
var currentBytes, desiredBytes bytes.Buffer
if err := jsonSerializer.Encode(current, &currentBytes); err != nil {
return errors.Wrapf(err, "cannot marshal current %s", HumanReadableReference(nil, current))
}
if err := jsonSerializer.Encode(desired, &desiredBytes); err != nil {
return errors.Wrapf(err, "cannot marshal desired %s", HumanReadableReference(nil, desired))
}
mergedBytes, err := jsonpatch.MergePatch(currentBytes.Bytes(), desiredBytes.Bytes())
if err != nil {
return errors.Wrapf(err, "cannot merge patch to %s", HumanReadableReference(nil, desired))
}

// write merged object back to `desired`
if _, _, err := jsonSerializer.Decode(mergedBytes, nil, desired); err != nil {
return errors.Wrapf(err, "cannot unmarshal merged patch to %s", HumanReadableReference(nil, desired))
}

// restore empty GVK for typed objects
if _, isUnstructured := desired.(runtime.Unstructured); !isUnstructured {
desired.GetObjectKind().SetGroupVersionKind(schema.GroupVersionKind{})
}

return nil
}

// An APIUpdatingApplicator applies changes to an object by either creating or
// updating it in a Kubernetes API server.
Expand All @@ -94,43 +158,37 @@ type APIUpdatingApplicator struct {

// NewAPIUpdatingApplicator returns an Applicator that applies changes to an
// object by either creating or updating it in a Kubernetes API server.
//
// Deprecated: Use NewAPIPatchingApplicator instead. The updating applicator
// can lead to data-loss if the Golang types in this process are not up-to-date.
func NewAPIUpdatingApplicator(c client.Client) *APIUpdatingApplicator {
return &APIUpdatingApplicator{client: c}
}

// Apply changes to the supplied object. The object will be created if it does
// not exist, or updated if it does.
func (a *APIUpdatingApplicator) Apply(ctx context.Context, o client.Object, ao ...ApplyOption) error {
m, ok := o.(Object)
if !ok {
return errors.New("cannot access object metadata")
func (a *APIUpdatingApplicator) Apply(ctx context.Context, obj client.Object, ao ...ApplyOption) error {
if obj.GetName() == "" && obj.GetGenerateName() != "" {
return a.client.Create(ctx, obj)
}

if m.GetName() == "" && m.GetGenerateName() != "" {
return errors.Wrap(a.client.Create(ctx, o), "cannot create object")
}

current := o.DeepCopyObject().(client.Object)

err := a.client.Get(ctx, types.NamespacedName{Name: m.GetName(), Namespace: m.GetNamespace()}, current)
current := obj.DeepCopyObject().(client.Object)
err := a.client.Get(ctx, types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()}, current)
if kerrors.IsNotFound(err) {
// TODO(negz): Apply ApplyOptions here too?
return errors.Wrap(a.client.Create(ctx, m), "cannot create object")
return a.client.Create(ctx, obj)
}
if err != nil {
return errors.Wrap(err, "cannot get object")
return err
}

for _, fn := range ao {
if err := fn(ctx, current, m); err != nil {
return err
if err := fn(ctx, current, obj); err != nil {
return errors.Wrapf(err, "apply option failed for %s", HumanReadableReference(a.client, obj))
}
}

// NOTE(hasheddan): we must set the resource version of the desired object
// to that of the current or the update will always fail.
m.SetResourceVersion(current.(metav1.Object).GetResourceVersion())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now update call fails without a resource version:

metadata.resourceVersion: Invalid value: 0x0: must be specified for an update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the object passed to the applicator here is based on a Get, i.e. has a resource version. Any other use of the applicator is invalid.

Copy link
Member

@turkenh turkenh Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We pass the current (which is a deepcopy of obj) to the Get. So, the object passed to Update does not have a resource version.

Copy link
Member

@turkenh turkenh Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, you mean "should be based on a Get" ? Confused because, currently this is not the case for most of the usages of Apply.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that AdditiveMergePatchApplyOption additively merges. This means that the RV from current is copied into desired. Hence, we always have a RV if Get has been successful. And if it hasn't, we create object. So we should be fine.

return errors.Wrap(a.client.Update(ctx, m), "cannot update object")
return a.client.Update(ctx, obj)
Copy link
Contributor

@pedjak pedjak Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wonder in case of error, would be able to see something in logs and correlate it with the previous log line? Also, do we really want to emit log line on INFO?

Should we log here in case of an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the clients produce errors with context. No need to add another one.

Error processing and error logging I leave to the caller. controller-runtime will also log if the error is passed up. If the error is expected, the caller will do the thing and not log. We don't know here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, I would like to see this in action in a proof PR in Crossplane.

}

// An APIFinalizer adds and removes finalizers to and from a resource.
Expand Down
Loading