Skip to content

Commit

Permalink
Merge pull request #2676 from k8s-infra-cherrypick-robot/cherry-pick-…
Browse files Browse the repository at this point in the history
…2663-to-release-0.17

[release-0.17] 🐛 Clean restmapper cache if a version is notFound
  • Loading branch information
k8s-ci-robot authored Feb 8, 2024
2 parents 11e5a5e + 0811bad commit 5923139
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 27 deletions.
43 changes: 33 additions & 10 deletions pkg/client/apiutil/restmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,23 +182,28 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
Group: metav1.APIGroup{Name: groupName},
VersionedResources: make(map[string][]metav1.APIResource),
}
if _, ok := m.knownGroups[groupName]; ok {
groupResources = m.knownGroups[groupName]
}

// Update information for group resources about versioned resources.
// The number of API calls is equal to the number of versions: /apis/<group>/<version>.
groupVersionResources, err := m.fetchGroupVersionResources(groupName, versions...)
// If we encounter a missing API version (NotFound error), we will remove the group from
// the m.apiGroups and m.knownGroups caches.
// If this happens, in the next call the group will be added back to apiGroups
// and only the existing versions will be loaded in knownGroups.
groupVersionResources, err := m.fetchGroupVersionResourcesLocked(groupName, versions...)
if err != nil {
return fmt.Errorf("failed to get API group resources: %w", err)
}
for version, resources := range groupVersionResources {
groupResources.VersionedResources[version.Version] = resources.APIResources

if _, ok := m.knownGroups[groupName]; ok {
groupResources = m.knownGroups[groupName]
}

// Update information for group resources about the API group by adding new versions.
// Ignore the versions that are already registered.
for _, version := range versions {
for groupVersion, resources := range groupVersionResources {
version := groupVersion.Version

groupResources.VersionedResources[version] = resources.APIResources
found := false
for _, v := range groupResources.Group.Versions {
if v.Version == version {
Expand Down Expand Up @@ -265,18 +270,26 @@ func (m *mapper) findAPIGroupByName(groupName string) (*metav1.APIGroup, error)
return m.apiGroups[groupName], nil
}

// fetchGroupVersionResources fetches the resources for the specified group and its versions.
func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string) (map[schema.GroupVersion]*metav1.APIResourceList, error) {
// fetchGroupVersionResourcesLocked fetches the resources for the specified group and its versions.
// This method might modify the cache so it needs to be called under the lock.
func (m *mapper) fetchGroupVersionResourcesLocked(groupName string, versions ...string) (map[schema.GroupVersion]*metav1.APIResourceList, error) {
groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
failedGroups := make(map[schema.GroupVersion]error)

for _, version := range versions {
groupVersion := schema.GroupVersion{Group: groupName, Version: version}

apiResourceList, err := m.client.ServerResourcesForGroupVersion(groupVersion.String())
if err != nil && !apierrors.IsNotFound(err) {
if apierrors.IsNotFound(err) && m.isGroupVersionCached(groupVersion) {
// If the version is not found, we remove the group from the cache
// so it gets refreshed on the next call.
delete(m.apiGroups, groupName)
delete(m.knownGroups, groupName)
continue
} else if err != nil {
failedGroups[groupVersion] = err
}

if apiResourceList != nil {
// even in case of error, some fallback might have been returned.
groupVersionResources[groupVersion] = apiResourceList
Expand All @@ -290,3 +303,13 @@ func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string

return groupVersionResources, nil
}

// isGroupVersionCached checks if a version for a group is cached in the known groups cache.
func (m *mapper) isGroupVersionCached(gv schema.GroupVersion) bool {
if cachedGroup, ok := m.knownGroups[gv.Group]; ok {
_, cached := cachedGroup.VersionedResources[gv.Version]
return cached
}

return false
}
168 changes: 151 additions & 17 deletions pkg/client/apiutil/restmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
gomegatypes "github.com/onsi/gomega/types"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"

Expand Down Expand Up @@ -529,23 +531,7 @@ func TestLazyRestMapperProvider(t *testing.T) {
g.Expect(err).NotTo(gmg.HaveOccurred())

// Register another CRD in runtime - "riders.crew.example.com".

crd := &apiextensionsv1.CustomResourceDefinition{}
err = c.Get(context.TODO(), types.NamespacedName{Name: "drivers.crew.example.com"}, crd)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver"))

newCRD := &apiextensionsv1.CustomResourceDefinition{}
crd.DeepCopyInto(newCRD)
newCRD.Name = "riders.crew.example.com"
newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{
Kind: "Rider",
Plural: "riders",
}
newCRD.ResourceVersion = ""

// Create the new CRD.
g.Expect(c.Create(context.TODO(), newCRD)).To(gmg.Succeed())
createNewCRD(context.TODO(), g, c, "crew.example.com", "Rider", "riders")

// Wait a bit until the CRD is registered.
g.Eventually(func() error {
Expand All @@ -564,6 +550,153 @@ func TestLazyRestMapperProvider(t *testing.T) {
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal("rider"))
})

t.Run("LazyRESTMapper should invalidate the group cache if a version is not found", func(t *testing.T) {
g := gmg.NewWithT(t)
ctx := context.Background()

httpClient, err := rest.HTTPClientFor(restCfg)
g.Expect(err).NotTo(gmg.HaveOccurred())

crt := newCountingRoundTripper(httpClient.Transport)
httpClient.Transport = crt

lazyRestMapper, err := apiutil.NewDynamicRESTMapper(restCfg, httpClient)
g.Expect(err).NotTo(gmg.HaveOccurred())

s := scheme.Scheme
err = apiextensionsv1.AddToScheme(s)
g.Expect(err).NotTo(gmg.HaveOccurred())

c, err := client.New(restCfg, client.Options{Scheme: s})
g.Expect(err).NotTo(gmg.HaveOccurred())

// Register a new CRD ina new group to avoid collisions when deleting versions - "taxis.inventory.example.com".
group := "inventory.example.com"
kind := "Taxi"
plural := "taxis"
crdName := plural + "." + group
// Create a CRD with two versions: v1alpha1 and v1 where both are served and
// v1 is the storage version so we can easily remove v1alpha1 later.
crd := newCRD(ctx, g, c, group, kind, plural)
v1alpha1 := crd.Spec.Versions[0]
v1alpha1.Name = "v1alpha1"
v1alpha1.Storage = false
v1alpha1.Served = true
v1 := crd.Spec.Versions[0]
v1.Name = "v1"
v1.Storage = true
v1.Served = true
crd.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{v1alpha1, v1}
g.Expect(c.Create(ctx, crd)).To(gmg.Succeed())
t.Cleanup(func() {
g.Expect(c.Delete(ctx, crd)).To(gmg.Succeed())
})

// Wait until the CRD is registered.
discHTTP, err := rest.HTTPClientFor(restCfg)
g.Expect(err).NotTo(gmg.HaveOccurred())
discClient, err := discovery.NewDiscoveryClientForConfigAndClient(restCfg, discHTTP)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Eventually(func(g gmg.Gomega) {
_, err = discClient.ServerResourcesForGroupVersion(group + "/v1")
g.Expect(err).NotTo(gmg.HaveOccurred())
}).Should(gmg.Succeed(), "v1 should be available")

// There are no requests before any call
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// Since we don't specify what version we expect, restmapper will fetch them all and search there.
// To fetch a list of available versions
// #1: GET https://host/api
// #2: GET https://host/apis
// Then, for all available versions:
// #3: GET https://host/apis/inventory.example.com/v1alpha1
// #4: GET https://host/apis/inventory.example.com/v1
// This should fill the cache for apiGroups and versions.
mapping, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal(kind))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(4))
crt.Reset() // We reset the counter to check how many additional requests are made later.

// At this point v1alpha1 should be cached
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1")
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// We update the CRD to only have v1 version.
g.Expect(c.Get(ctx, types.NamespacedName{Name: crdName}, crd)).To(gmg.Succeed())
for _, version := range crd.Spec.Versions {
if version.Name == "v1" {
v1 = version
break
}
}
crd.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{v1}
g.Expect(c.Update(ctx, crd)).To(gmg.Succeed())

// We wait until v1alpha1 is not available anymore.
g.Eventually(func(g gmg.Gomega) {
_, err = discClient.ServerResourcesForGroupVersion(group + "/v1alpha1")
g.Expect(apierrors.IsNotFound(err)).To(gmg.BeTrue(), "v1alpha1 should not be available anymore")
}).Should(gmg.Succeed())

// Although v1alpha1 is not available anymore, the cache is not invalidated yet so it should return a mapping.
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1")
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// We request Limo, which is not in the mapper because it doesn't exist.
// This will trigger a reload of the lazy mapper cache.
// Reloading the cache will read v2 again and since it's not available anymore, it should invalidate the cache.
// #1: GET https://host/apis/inventory.example.com/v1alpha1
// #2: GET https://host/apis/inventory.example.com/v1
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: "Limo"})
g.Expect(err).To(beNoMatchError())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(2))
crt.Reset()

// Now we request v1alpha1 again and it should return an error since the cache was invalidated.
// #1: GET https://host/apis/inventory.example.com/v1alpha1
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1")
g.Expect(err).To(beNoMatchError())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(1))

// Verify that when requesting the mapping without a version, it doesn't error
// and it returns v1.
mapping, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.Resource.Version).To(gmg.Equal("v1"))
})
}

// createNewCRD creates a new CRD with the given group, kind, and plural and returns it.
func createNewCRD(ctx context.Context, g gmg.Gomega, c client.Client, group, kind, plural string) *apiextensionsv1.CustomResourceDefinition {
newCRD := newCRD(ctx, g, c, group, kind, plural)
g.Expect(c.Create(ctx, newCRD)).To(gmg.Succeed())

return newCRD
}

// newCRD returns a new CRD with the given group, kind, and plural.
func newCRD(ctx context.Context, g gmg.Gomega, c client.Client, group, kind, plural string) *apiextensionsv1.CustomResourceDefinition {
crd := &apiextensionsv1.CustomResourceDefinition{}
err := c.Get(ctx, types.NamespacedName{Name: "drivers.crew.example.com"}, crd)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver"))

newCRD := &apiextensionsv1.CustomResourceDefinition{}
crd.DeepCopyInto(newCRD)
newCRD.Spec.Group = group
newCRD.Name = plural + "." + group
newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{
Kind: kind,
Plural: plural,
}
newCRD.ResourceVersion = ""

return newCRD
}

func beNoMatchError() gomegatypes.GomegaMatcher {
Expand Down Expand Up @@ -594,6 +727,7 @@ func (e *errorMatcher) Match(actual interface{}) (success bool, err error) {
func (e *errorMatcher) FailureMessage(actual interface{}) (message string) {
return format.Message(actual, fmt.Sprintf("to be %s error", e.message))
}

func (e *errorMatcher) NegatedFailureMessage(actual interface{}) (message string) {
return format.Message(actual, fmt.Sprintf("not to be %s error", e.message))
}

0 comments on commit 5923139

Please sign in to comment.