Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tracking for deleted namespace status check in restore flow #8233

Merged
merged 4 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/8233-sangitaray2021
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added tracking for deleted namespace status check in restore flow.
23 changes: 12 additions & 11 deletions pkg/controller/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,17 +559,18 @@ func (r *restoreReconciler) runValidatedRestore(restore *api.Restore, info backu
}

restoreReq := &pkgrestore.Request{
Log: restoreLog,
Restore: restore,
Backup: info.backup,
PodVolumeBackups: podVolumeBackups,
VolumeSnapshots: volumeSnapshots,
BackupReader: backupFile,
ResourceModifiers: resourceModifiers,
DisableInformerCache: r.disableInformerCache,
CSIVolumeSnapshots: csiVolumeSnapshots,
BackupVolumeInfoMap: backupVolumeInfoMap,
RestoreVolumeInfoTracker: volume.NewRestoreVolInfoTracker(restore, restoreLog, r.globalCrClient),
Log: restoreLog,
Restore: restore,
Backup: info.backup,
PodVolumeBackups: podVolumeBackups,
VolumeSnapshots: volumeSnapshots,
BackupReader: backupFile,
ResourceModifiers: resourceModifiers,
DisableInformerCache: r.disableInformerCache,
CSIVolumeSnapshots: csiVolumeSnapshots,
BackupVolumeInfoMap: backupVolumeInfoMap,
RestoreVolumeInfoTracker: volume.NewRestoreVolInfoTracker(restore, restoreLog, r.globalCrClient),
ResourceDeletionStatusTracker: kubeutil.NewResourceDeletionStatusTracker(),
}
restoreWarnings, restoreErrors := r.restorer.RestoreWithResolvers(restoreReq, actionsResolver, pluginManager)

Expand Down
26 changes: 14 additions & 12 deletions pkg/restore/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/vmware-tanzu/velero/internal/volume"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/itemoperation"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)

const (
Expand All @@ -52,18 +53,19 @@ func resourceKey(obj runtime.Object) string {
type Request struct {
*velerov1api.Restore

Log logrus.FieldLogger
Backup *velerov1api.Backup
PodVolumeBackups []*velerov1api.PodVolumeBackup
VolumeSnapshots []*volume.Snapshot
BackupReader io.Reader
RestoredItems map[itemKey]restoredItemStatus
itemOperationsList *[]*itemoperation.RestoreOperation
ResourceModifiers *resourcemodifiers.ResourceModifiers
DisableInformerCache bool
CSIVolumeSnapshots []*snapshotv1api.VolumeSnapshot
BackupVolumeInfoMap map[string]volume.BackupVolumeInfo
RestoreVolumeInfoTracker *volume.RestoreVolumeInfoTracker
Log logrus.FieldLogger
Backup *velerov1api.Backup
PodVolumeBackups []*velerov1api.PodVolumeBackup
VolumeSnapshots []*volume.Snapshot
BackupReader io.Reader
RestoredItems map[itemKey]restoredItemStatus
itemOperationsList *[]*itemoperation.RestoreOperation
ResourceModifiers *resourcemodifiers.ResourceModifiers
DisableInformerCache bool
CSIVolumeSnapshots []*snapshotv1api.VolumeSnapshot
BackupVolumeInfoMap map[string]volume.BackupVolumeInfo
RestoreVolumeInfoTracker *volume.RestoreVolumeInfoTracker
ResourceDeletionStatusTracker kube.ResourceDeletionStatusTracker
}

type restoredItemStatus struct {
Expand Down
38 changes: 21 additions & 17 deletions pkg/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,23 @@ type Restorer interface {

// kubernetesRestorer implements Restorer for restoring into a Kubernetes cluster.
type kubernetesRestorer struct {
discoveryHelper discovery.Helper
dynamicFactory client.DynamicFactory
namespaceClient corev1.NamespaceInterface
podVolumeRestorerFactory podvolume.RestorerFactory
podVolumeTimeout time.Duration
resourceTerminatingTimeout time.Duration
resourceTimeout time.Duration
resourcePriorities types.Priorities
fileSystem filesystem.Interface
pvRenamer func(string) (string, error)
logger logrus.FieldLogger
podCommandExecutor podexec.PodCommandExecutor
podGetter cache.Getter
credentialFileStore credentials.FileStore
kbClient crclient.Client
multiHookTracker *hook.MultiHookTracker
discoveryHelper discovery.Helper
dynamicFactory client.DynamicFactory
namespaceClient corev1.NamespaceInterface
podVolumeRestorerFactory podvolume.RestorerFactory
podVolumeTimeout time.Duration
resourceTerminatingTimeout time.Duration
resourceTimeout time.Duration
resourcePriorities types.Priorities
fileSystem filesystem.Interface
pvRenamer func(string) (string, error)
logger logrus.FieldLogger
podCommandExecutor podexec.PodCommandExecutor
podGetter cache.Getter
credentialFileStore credentials.FileStore
kbClient crclient.Client
multiHookTracker *hook.MultiHookTracker
resourceDeletionStatusTracker kube.ResourceDeletionStatusTracker
}

// NewKubernetesRestorer creates a new kubernetesRestorer.
Expand Down Expand Up @@ -323,6 +324,7 @@ func (kr *kubernetesRestorer) RestoreWithResolvers(
backupVolumeInfoMap: req.BackupVolumeInfoMap,
restoreVolumeInfoTracker: req.RestoreVolumeInfoTracker,
hooksWaitExecutor: hooksWaitExecutor,
resourceDeletionStatusTracker: req.ResourceDeletionStatusTracker,
}

return restoreCtx.execute()
Expand Down Expand Up @@ -371,6 +373,7 @@ type restoreContext struct {
backupVolumeInfoMap map[string]volume.BackupVolumeInfo
restoreVolumeInfoTracker *volume.RestoreVolumeInfoTracker
hooksWaitExecutor *hooksWaitExecutor
resourceDeletionStatusTracker kube.ResourceDeletionStatusTracker
}

type resourceClientKey struct {
Expand Down Expand Up @@ -718,6 +721,7 @@ func (ctx *restoreContext) processSelectedResource(
ns,
ctx.namespaceClient,
ctx.resourceTerminatingTimeout,
ctx.resourceDeletionStatusTracker,
)
if err != nil {
errs.AddVeleroError(err)
Expand Down Expand Up @@ -1119,7 +1123,7 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso
// namespace into which the resource is being restored into exists.
// This is the *remapped* namespace that we are ensuring exists.
nsToEnsure := getNamespace(ctx.log, archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", obj.GetNamespace()), namespace)
_, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, ctx.namespaceClient, ctx.resourceTerminatingTimeout)
_, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(nsToEnsure, ctx.namespaceClient, ctx.resourceTerminatingTimeout, ctx.resourceDeletionStatusTracker)
if err != nil {
errs.AddVeleroError(err)
return warnings, errs, itemExists
Expand Down
34 changes: 20 additions & 14 deletions pkg/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
uploadermocks "github.com/vmware-tanzu/velero/pkg/podvolume/mocks"
"github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/types"
"github.com/vmware-tanzu/velero/pkg/util/kube"
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
. "github.com/vmware-tanzu/velero/pkg/util/results"
)
Expand Down Expand Up @@ -2292,10 +2293,11 @@ func TestShouldRestore(t *testing.T) {
h := newHarness(t)

ctx := &restoreContext{
log: h.log,
dynamicFactory: client.NewDynamicFactory(h.DynamicClient),
namespaceClient: h.KubeClient.CoreV1().Namespaces(),
resourceTerminatingTimeout: time.Millisecond,
log: h.log,
dynamicFactory: client.NewDynamicFactory(h.DynamicClient),
namespaceClient: h.KubeClient.CoreV1().Namespaces(),
resourceTerminatingTimeout: time.Millisecond,
resourceDeletionStatusTracker: kube.NewResourceDeletionStatusTracker(),
}

for _, resource := range tc.apiResources {
Expand Down Expand Up @@ -3710,9 +3712,10 @@ func newHarness(t *testing.T) *harness {
fileSystem: test.NewFakeFileSystem(),

// unsupported
podVolumeRestorerFactory: nil,
podVolumeTimeout: 0,
kbClient: kbClient,
podVolumeRestorerFactory: nil,
podVolumeTimeout: 0,
kbClient: kbClient,
resourceDeletionStatusTracker: kube.NewResourceDeletionStatusTracker(),
},
log: log,
}
Expand Down Expand Up @@ -3899,9 +3902,10 @@ func TestIsAlreadyExistsError(t *testing.T) {
h := newHarness(t)

ctx := &restoreContext{
log: h.log,
dynamicFactory: client.NewDynamicFactory(h.DynamicClient),
namespaceClient: h.KubeClient.CoreV1().Namespaces(),
log: h.log,
dynamicFactory: client.NewDynamicFactory(h.DynamicClient),
namespaceClient: h.KubeClient.CoreV1().Namespaces(),
resourceDeletionStatusTracker: kube.NewResourceDeletionStatusTracker(),
}

if test.apiResource != nil {
Expand Down Expand Up @@ -4018,7 +4022,8 @@ func TestHasCSIVolumeSnapshot(t *testing.T) {
h := newHarness(t)

ctx := &restoreContext{
log: h.log,
log: h.log,
resourceDeletionStatusTracker: kube.NewResourceDeletionStatusTracker(),
}

if tc.vs != nil {
Expand Down Expand Up @@ -4118,9 +4123,10 @@ func TestHasSnapshotDataUpload(t *testing.T) {
h := newHarness(t)

ctx := &restoreContext{
log: h.log,
kbClient: h.restorer.kbClient,
restore: tc.restore,
log: h.log,
kbClient: h.restorer.kbClient,
restore: tc.restore,
resourceDeletionStatusTracker: kube.NewResourceDeletionStatusTracker(),
}

if tc.duResult != nil {
Expand Down
71 changes: 71 additions & 0 deletions pkg/util/kube/resource_deletionstatus_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2018 the Velero contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kube

import (
"fmt"
"sync"

"k8s.io/apimachinery/pkg/util/sets"
)

// resourceDeletionStatusTracker keeps track of items pending deletion.
type ResourceDeletionStatusTracker interface {
// Add informs the tracker that a polling is in progress to check namespace deletion status.
Add(kind, ns, name string)
// Delete informs the tracker that a namespace deletion is completed.
Delete(kind, ns, name string)
// Contains returns true if the tracker is tracking the namespace deletion progress.
Contains(kind, ns, name string) bool
}

type resourceDeletionStatusTracker struct {
lock sync.RWMutex
isNameSpacePresentInPollingSet sets.Set[string]
}

// NewResourceDeletionStatusTracker returns a new ResourceDeletionStatusTracker.
func NewResourceDeletionStatusTracker() ResourceDeletionStatusTracker {
return &resourceDeletionStatusTracker{
isNameSpacePresentInPollingSet: sets.New[string](),
}
}

func (bt *resourceDeletionStatusTracker) Add(kind, ns, name string) {
bt.lock.Lock()
defer bt.lock.Unlock()

bt.isNameSpacePresentInPollingSet.Insert(resourceDeletionStatusTrackerKey(kind, ns, name))
}

func (bt *resourceDeletionStatusTracker) Delete(kind, ns, name string) {
bt.lock.Lock()
defer bt.lock.Unlock()

bt.isNameSpacePresentInPollingSet.Delete(resourceDeletionStatusTrackerKey(kind, ns, name))

Check warning on line 59 in pkg/util/kube/resource_deletionstatus_tracker.go

View check run for this annotation

Codecov / codecov/patch

pkg/util/kube/resource_deletionstatus_tracker.go#L55-L59

Added lines #L55 - L59 were not covered by tests
}

func (bt *resourceDeletionStatusTracker) Contains(kind, ns, name string) bool {
bt.lock.RLock()
defer bt.lock.RUnlock()

return bt.isNameSpacePresentInPollingSet.Has(resourceDeletionStatusTrackerKey(kind, ns, name))
}

func resourceDeletionStatusTrackerKey(kind, ns, name string) string {
return fmt.Sprintf("%s/%s/%s", kind, ns, name)
}
16 changes: 14 additions & 2 deletions pkg/util/kube/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ func NamespaceAndName(objMeta metav1.Object) string {
//
// namespace already exists and is not ready, this function will return (false, false, nil).
// If the namespace exists and is marked for deletion, this function will wait up to the timeout for it to fully delete.
func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client corev1client.NamespaceInterface, timeout time.Duration) (ready bool, nsCreated bool, err error) {
func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client corev1client.NamespaceInterface, timeout time.Duration, resourceDeletionStatusTracker ResourceDeletionStatusTracker) (ready bool, nsCreated bool, err error) {
// nsCreated tells whether the namespace was created by this method
// required for keeping track of number of restored items
// if namespace is marked for deletion, and we timed out, report an error
var terminatingNamespace bool

var namespaceAlreadyInDeletionTracker bool

err = wait.PollUntilContextTimeout(context.Background(), time.Second, timeout, true, func(ctx context.Context) (bool, error) {
clusterNS, err := client.Get(ctx, namespace.Name, metav1.GetOptions{})
// if namespace is marked for deletion, and we timed out, report an error
Expand All @@ -92,8 +95,12 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core
// Return the err and exit the loop.
return true, err
}

if clusterNS != nil && (clusterNS.GetDeletionTimestamp() != nil || clusterNS.Status.Phase == corev1api.NamespaceTerminating) {
if resourceDeletionStatusTracker.Contains(clusterNS.Kind, clusterNS.Name, clusterNS.Name) {
namespaceAlreadyInDeletionTracker = true
return true, errors.Errorf("namespace %s is already present in the polling set, skipping execution", namespace.Name)
}

// Marked for deletion, keep waiting
terminatingNamespace = true
return false, nil
Expand All @@ -107,7 +114,12 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core
// err will be set if we timed out or encountered issues retrieving the namespace,
if err != nil {
if terminatingNamespace {
// If the namespace is marked for deletion, and we timed out, adding it in tracker
resourceDeletionStatusTracker.Add(namespace.Kind, namespace.Name, namespace.Name)
return false, nsCreated, errors.Wrapf(err, "timed out waiting for terminating namespace %s to disappear before restoring", namespace.Name)
} else if namespaceAlreadyInDeletionTracker {
// If the namespace is already in the tracker, return an error.
return false, nsCreated, errors.Wrapf(err, "skipping polling for terminating namespace %s", namespace.Name)
}
return false, nsCreated, errors.Wrapf(err, "error getting namespace %s", namespace.Name)
}
Expand Down
33 changes: 24 additions & 9 deletions pkg/util/kube/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ func TestNamespaceAndName(t *testing.T) {

func TestEnsureNamespaceExistsAndIsReady(t *testing.T) {
tests := []struct {
name string
expectNSFound bool
nsPhase corev1.NamespacePhase
nsDeleting bool
expectCreate bool
alreadyExists bool
expectedResult bool
expectedCreatedResult bool
name string
expectNSFound bool
nsPhase corev1.NamespacePhase
nsDeleting bool
expectCreate bool
alreadyExists bool
expectedResult bool
expectedCreatedResult bool
nsAlreadyInTerminationTracker bool
ResourceDeletionStatusTracker ResourceDeletionStatusTracker
}{
{
name: "namespace found, not deleting",
Expand Down Expand Up @@ -95,8 +97,17 @@ func TestEnsureNamespaceExistsAndIsReady(t *testing.T) {
expectedResult: false,
expectedCreatedResult: false,
},
{
name: "same namespace found earlier, terminating phase already tracked",
expectNSFound: true,
nsPhase: corev1.NamespaceTerminating,
expectedResult: false,
expectedCreatedResult: false,
nsAlreadyInTerminationTracker: true,
},
}

resourceDeletionStatusTracker := NewResourceDeletionStatusTracker()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
namespace := &corev1.Namespace{
Expand Down Expand Up @@ -132,7 +143,11 @@ func TestEnsureNamespaceExistsAndIsReady(t *testing.T) {
nsClient.On("Create", namespace).Return(namespace, nil)
}

result, nsCreated, _ := EnsureNamespaceExistsAndIsReady(namespace, nsClient, timeout)
if test.nsAlreadyInTerminationTracker {
resourceDeletionStatusTracker.Add(namespace.Kind, "test", "test")
}

result, nsCreated, _ := EnsureNamespaceExistsAndIsReady(namespace, nsClient, timeout, resourceDeletionStatusTracker)

assert.Equal(t, test.expectedResult, result)
assert.Equal(t, test.expectedCreatedResult, nsCreated)
Expand Down
Loading