Skip to content

Commit

Permalink
Clean restmapper cache if a version is notFound
Browse files Browse the repository at this point in the history
This avoids:
- Extra calls to https://host/apis/<group>/<version> when a version seen
  before and cached in apiGroups is deleted or marked as not served.
- Returnning a valid mapping for a cached version that is deleted or not
  served anymore.
  • Loading branch information
g-gaston authored and k8s-infra-cherrypick-robot committed Feb 7, 2024
1 parent 11e5a5e commit 40b41df
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 22 deletions.
30 changes: 25 additions & 5 deletions pkg/client/apiutil/restmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,23 +182,26 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
Group: metav1.APIGroup{Name: groupName},
VersionedResources: make(map[string][]metav1.APIResource),
}
if _, ok := m.knownGroups[groupName]; ok {
groupResources = m.knownGroups[groupName]
}

// Update information for group resources about versioned resources.
// The number of API calls is equal to the number of versions: /apis/<group>/<version>.
groupVersionResources, err := m.fetchGroupVersionResources(groupName, versions...)
if err != nil {
return fmt.Errorf("failed to get API group resources: %w", err)
}

if _, ok := m.knownGroups[groupName]; ok {
groupResources = m.knownGroups[groupName]
}

for version, resources := range groupVersionResources {
groupResources.VersionedResources[version.Version] = resources.APIResources
}

// Update information for group resources about the API group by adding new versions.
// Ignore the versions that are already registered.
for _, version := range versions {
for groupVersion := range groupVersionResources {
version := groupVersion.Version
found := false
for _, v := range groupResources.Group.Versions {
if v.Version == version {
Expand Down Expand Up @@ -266,6 +269,7 @@ func (m *mapper) findAPIGroupByName(groupName string) (*metav1.APIGroup, error)
}

// fetchGroupVersionResources fetches the resources for the specified group and its versions.
// This method might modify the cache so it needs to be called under the lock.
func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string) (map[schema.GroupVersion]*metav1.APIResourceList, error) {
groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
failedGroups := make(map[schema.GroupVersion]error)
Expand All @@ -274,9 +278,15 @@ func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string
groupVersion := schema.GroupVersion{Group: groupName, Version: version}

apiResourceList, err := m.client.ServerResourcesForGroupVersion(groupVersion.String())
if err != nil && !apierrors.IsNotFound(err) {
if apierrors.IsNotFound(err) && m.isGroupVersionCached(groupVersion) {
// If the version is not found, we remove the group from the cache
// so it gets refreshed on the next call.
delete(m.apiGroups, groupName)
delete(m.knownGroups, groupName)
} else if err != nil {
failedGroups[groupVersion] = err
}

if apiResourceList != nil {
// even in case of error, some fallback might have been returned.
groupVersionResources[groupVersion] = apiResourceList
Expand All @@ -290,3 +300,13 @@ func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string

return groupVersionResources, nil
}

// isGroupVersionCached checks if a version for a group is cached in the known groups cache.
func (m *mapper) isGroupVersionCached(gv schema.GroupVersion) bool {
if cachedGroup, ok := m.knownGroups[gv.Group]; ok {
_, cached := cachedGroup.VersionedResources[gv.Version]
return cached
}

return false
}
146 changes: 129 additions & 17 deletions pkg/client/apiutil/restmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
gomegatypes "github.com/onsi/gomega/types"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"

Expand Down Expand Up @@ -529,23 +531,7 @@ func TestLazyRestMapperProvider(t *testing.T) {
g.Expect(err).NotTo(gmg.HaveOccurred())

// Register another CRD in runtime - "riders.crew.example.com".

crd := &apiextensionsv1.CustomResourceDefinition{}
err = c.Get(context.TODO(), types.NamespacedName{Name: "drivers.crew.example.com"}, crd)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver"))

newCRD := &apiextensionsv1.CustomResourceDefinition{}
crd.DeepCopyInto(newCRD)
newCRD.Name = "riders.crew.example.com"
newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{
Kind: "Rider",
Plural: "riders",
}
newCRD.ResourceVersion = ""

// Create the new CRD.
g.Expect(c.Create(context.TODO(), newCRD)).To(gmg.Succeed())
createNewCRD(context.TODO(), g, c, "crew.example.com", "Rider", "riders")

// Wait a bit until the CRD is registered.
g.Eventually(func() error {
Expand All @@ -564,6 +550,131 @@ func TestLazyRestMapperProvider(t *testing.T) {
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal("rider"))
})

t.Run("LazyRESTMapper should invalidate the group cache if a version is not found", func(t *testing.T) {
g := gmg.NewWithT(t)
ctx := context.Background()

httpClient, err := rest.HTTPClientFor(restCfg)
g.Expect(err).NotTo(gmg.HaveOccurred())

crt := newCountingRoundTripper(httpClient.Transport)
httpClient.Transport = crt

lazyRestMapper, err := apiutil.NewDynamicRESTMapper(restCfg, httpClient)
g.Expect(err).NotTo(gmg.HaveOccurred())

s := scheme.Scheme
err = apiextensionsv1.AddToScheme(s)
g.Expect(err).NotTo(gmg.HaveOccurred())

c, err := client.New(restCfg, client.Options{Scheme: s})
g.Expect(err).NotTo(gmg.HaveOccurred())

// Register a new CRD ina new group to avoid collisions when deleting versions - "taxi.inventory.example.com".
group := "inventory.example.com"
kind := "Taxi"
plural := "taxis"
crdName := plural + "." + group
crd := createNewCRD(ctx, g, c, group, kind, plural)
t.Cleanup(func() {
g.Expect(c.Delete(ctx, crd)).To(gmg.Succeed())
})

// Wait until the CRD is registered.
discHTTP, err := rest.HTTPClientFor(restCfg)
g.Expect(err).NotTo(gmg.HaveOccurred())
discClient, err := discovery.NewDiscoveryClientForConfigAndClient(restCfg, discHTTP)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Eventually(func(g gmg.Gomega) {
_, err = discClient.ServerResourcesForGroupVersion(group + "/v1")
g.Expect(err).NotTo(gmg.HaveOccurred())
}).Should(gmg.Succeed(), "v1 should be available")

// There are no requests before any call
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// Since we don't specify what version we expect, restmapper will fetch them all and search there.
// To fetch a list of available versions
// #1: GET https://host/api
// #2: GET https://host/apis
// Then, for all available versions:
// #3: GET https://host/apis/inventory.example.com/v1
// #4: GET https://host/apis/inventory.example.com/v2
// This should fill the cache for apiGroups and versions.
mapping, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal(kind))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(4))
crt.Reset() // We reset the counter to check how many additional requests are made later.

// At this point v2 should be cached
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v2")
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// We update the CRD to only have v1 version.
g.Expect(c.Get(ctx, types.NamespacedName{Name: crdName}, crd)).To(gmg.Succeed())
var v1 apiextensionsv1.CustomResourceDefinitionVersion
for i, version := range crd.Spec.Versions {
if version.Name == "v1" {
crd.Spec.Versions[i].Storage = true
v1 = version
v1.Storage = true
}
}
crd.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{v1}
g.Expect(c.Update(ctx, crd)).To(gmg.Succeed())

// We wait until v2 is not available anymore.
g.Eventually(func(g gmg.Gomega) {
_, err = discClient.ServerResourcesForGroupVersion(group + "/v2")
g.Expect(apierrors.IsNotFound(err)).To(gmg.BeTrue(), "v2 should not be available anymore")
}).Should(gmg.Succeed())

// Although v2 is not available anymore, the cache is not invalidated yet so it should return a mapping.
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v2")
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// We request Limo, which is not in the mapper because it doesn't exist.
// This will trigger a reload of the lazy mapper cache.
// Reloading the cache will read v2 again and since it's not available anymore, it should invalidate the cache.
// #1: GET https://host/apis/inventory.example.com/v1
// #2: GET https://host/apis/inventory.example.com/v2
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: "Limo"})
g.Expect(err).To(beNoMatchError())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(2))
crt.Reset()

// Now we request v2 again and it should return an error since the cache was invalidated.
// #1: GET https://host/apis/inventory.example.com/v2
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v2")
g.Expect(err).To(beNoMatchError())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(1))
})
}

func createNewCRD(ctx context.Context, g gmg.Gomega, c client.Client, group, kind, plural string) *apiextensionsv1.CustomResourceDefinition {
crd := &apiextensionsv1.CustomResourceDefinition{}
err := c.Get(ctx, types.NamespacedName{Name: "drivers.crew.example.com"}, crd)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver"))

newCRD := &apiextensionsv1.CustomResourceDefinition{}
crd.DeepCopyInto(newCRD)
newCRD.Spec.Group = group
newCRD.Name = plural + "." + group
newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{
Kind: kind,
Plural: plural,
}
newCRD.ResourceVersion = ""

// Create the new CRD.
g.Expect(c.Create(ctx, newCRD)).To(gmg.Succeed())

return newCRD
}

func beNoMatchError() gomegatypes.GomegaMatcher {
Expand Down Expand Up @@ -594,6 +705,7 @@ func (e *errorMatcher) Match(actual interface{}) (success bool, err error) {
func (e *errorMatcher) FailureMessage(actual interface{}) (message string) {
return format.Message(actual, fmt.Sprintf("to be %s error", e.message))
}

func (e *errorMatcher) NegatedFailureMessage(actual interface{}) (message string) {
return format.Message(actual, fmt.Sprintf("not to be %s error", e.message))
}

0 comments on commit 40b41df

Please sign in to comment.