From ccadf59d575419d9d65995853255163ad30f1d1c Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Wed, 21 Aug 2024 15:14:53 +0800 Subject: [PATCH] Implement the Repo maintanence Job configuration design. Remove the resource parameters from the velero server CLI. Signed-off-by: Xun Jiang --- changelogs/unreleased/8145-blackpiglet | 1 + design/repo_maintenance_job_config.md | 16 +- pkg/cmd/cli/install/install.go | 8 - pkg/cmd/cli/nodeagent/server.go | 18 +- pkg/cmd/server/server.go | 50 +++-- .../backup_repository_controller.go | 10 +- pkg/controller/data_upload_controller.go | 23 +- pkg/controller/data_upload_controller_test.go | 16 +- pkg/exposer/csi_snapshot.go | 68 ++---- pkg/exposer/csi_snapshot_test.go | 101 --------- pkg/install/deployment.go | 28 --- pkg/install/deployment_test.go | 16 -- pkg/install/resources.go | 1 - pkg/nodeagent/node_agent.go | 3 +- pkg/repository/maintenance.go | 168 ++++++++++++-- pkg/repository/maintenance_test.go | 210 ++++++++++++++++-- pkg/repository/manager.go | 71 ++++-- pkg/repository/manager_test.go | 3 +- pkg/types/common.go | 28 +++ pkg/util/csi/util.go | 48 ++++ pkg/util/csi/util_test.go | 108 +++++++++ 21 files changed, 698 insertions(+), 297 deletions(-) create mode 100644 changelogs/unreleased/8145-blackpiglet create mode 100644 pkg/types/common.go diff --git a/changelogs/unreleased/8145-blackpiglet b/changelogs/unreleased/8145-blackpiglet new file mode 100644 index 00000000000..6e64442bc94 --- /dev/null +++ b/changelogs/unreleased/8145-blackpiglet @@ -0,0 +1 @@ +Implement the Repo maintenance Job configuration. diff --git a/design/repo_maintenance_job_config.md b/design/repo_maintenance_job_config.md index ad055795de6..77969263101 100644 --- a/design/repo_maintenance_job_config.md +++ b/design/repo_maintenance_job_config.md @@ -49,8 +49,6 @@ velero server \ ### Structure The data structure for ```repo-maintenance-job-config``` is as below: ```go -type MaintenanceConfigMap map[string]Configs - type Configs struct { // LoadAffinity is the config for data path load affinity. LoadAffinity []*LoadAffinity `json:"loadAffinity,omitempty"` @@ -80,15 +78,19 @@ type Resources struct { ``` The ConfigMap content is a map. -If there is a key value as `global` in the map, the key's value is applied to all BackupRepositories maintenance jobs that don't their own specific configuration in the ConfigMap. +If there is a key value as `global` in the map, the key's value is applied to all BackupRepositories maintenance jobs that cannot find their own specific configuration in the ConfigMap. The other keys in the map is the combination of three elements of a BackupRepository: -* The namespace in which BackupRepository backs up volume data -* The BackupRepository referenced BackupStorageLocation's name -* The BackupRepository's type. Possible values are `kopia` and `restic` +* The namespace in which BackupRepository backs up volume data. +* The BackupRepository referenced BackupStorageLocation's name. +* The BackupRepository's type. Possible values are `kopia` and `restic`. + +Those three keys can identify a [unique BackupRepository](https://github.com/vmware-tanzu/velero/blob/2fc6300f2239f250b40b0488c35feae59520f2d3/pkg/repository/backup_repo_op.go#L32-L37). + If there is a key match with BackupRepository, the key's value is applied to the BackupRepository's maintenance jobs. By this way, it's possible to let user configure before the BackupRepository is created. This is especially convenient for administrator configuring during the Velero installation. -For example, the following BackupRepository's key should be `test-default-kopia` +For example, the following BackupRepository's key should be `test-default-kopia`. + ``` yaml - apiVersion: velero.io/v1 kind: BackupRepository diff --git a/pkg/cmd/cli/install/install.go b/pkg/cmd/cli/install/install.go index b73438e78aa..39fd727b4fa 100644 --- a/pkg/cmd/cli/install/install.go +++ b/pkg/cmd/cli/install/install.go @@ -23,7 +23,6 @@ import ( "strings" "time" - "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/pkg/errors" @@ -85,7 +84,6 @@ type Options struct { DefaultSnapshotMoveData bool DisableInformerCache bool ScheduleSkipImmediately bool - MaintenanceCfg repository.MaintenanceConfig } // BindFlags adds command line values to the options struct. @@ -130,11 +128,6 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) { flags.BoolVar(&o.DefaultSnapshotMoveData, "default-snapshot-move-data", o.DefaultSnapshotMoveData, "Bool flag to configure Velero server to move data by default for all snapshots supporting data movement. Optional.") flags.BoolVar(&o.DisableInformerCache, "disable-informer-cache", o.DisableInformerCache, "Disable informer cache for Get calls on restore. With this enabled, it will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is false (don't disable). Optional.") flags.BoolVar(&o.ScheduleSkipImmediately, "schedule-skip-immediately", o.ScheduleSkipImmediately, "Skip the first scheduled backup immediately after creating a schedule. Default is false (don't skip).") - flags.IntVar(&o.MaintenanceCfg.KeepLatestMaitenanceJobs, "keep-latest-maintenance-jobs", o.MaintenanceCfg.KeepLatestMaitenanceJobs, "Number of latest maintenance jobs to keep each repository. Optional.") - flags.StringVar(&o.MaintenanceCfg.CPURequest, "maintenance-job-cpu-request", o.MaintenanceCfg.CPURequest, "CPU request for maintenance jobs. Default is no limit.") - flags.StringVar(&o.MaintenanceCfg.MemRequest, "maintenance-job-mem-request", o.MaintenanceCfg.MemRequest, "Memory request for maintenance jobs. Default is no limit.") - flags.StringVar(&o.MaintenanceCfg.CPULimit, "maintenance-job-cpu-limit", o.MaintenanceCfg.CPULimit, "CPU limit for maintenance jobs. Default is no limit.") - flags.StringVar(&o.MaintenanceCfg.MemLimit, "maintenance-job-mem-limit", o.MaintenanceCfg.MemLimit, "Memory limit for maintenance jobs. Default is no limit.") } // NewInstallOptions instantiates a new, default InstallOptions struct. @@ -231,7 +224,6 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) { DefaultSnapshotMoveData: o.DefaultSnapshotMoveData, DisableInformerCache: o.DisableInformerCache, ScheduleSkipImmediately: o.ScheduleSkipImmediately, - MaintenanceCfg: o.MaintenanceCfg, }, nil } diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 20a4654fb65..2e5e16055c5 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -59,6 +59,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/nodeagent" "github.com/vmware-tanzu/velero/pkg/repository" + velerotypes "github.com/vmware-tanzu/velero/pkg/types" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" @@ -292,7 +293,7 @@ func (s *nodeAgentServer) run() { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } - var loadAffinity *nodeagent.LoadAffinity + var loadAffinity *velerotypes.LoadAffinity if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 { loadAffinity = s.dataPathConfigs.LoadAffinity[0] } @@ -302,7 +303,20 @@ func (s *nodeAgentServer) run() { backupPVCConfig = s.dataPathConfigs.BackupPVCConfig } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataUploadReconciler := controller.NewDataUploadReconciler( + s.mgr.GetClient(), + s.mgr, + s.kubeClient, + s.csiSnapshotClient.SnapshotV1(), + s.dataPathMgr, + loadAffinity, + backupPVCConfig, + clock.RealClock{}, + s.nodeName, + s.config.dataMoverPrepareTimeout, + s.logger, + s.metrics, + ) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 3a2b0702433..f7d2d9c5a16 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -139,8 +139,8 @@ type serverConfig struct { defaultSnapshotMoveData bool disableInformerCache bool scheduleSkipImmediately bool - maintenanceCfg repository.MaintenanceConfig - backukpRepoConfig string + backupRepoConfig string + repoMaintenanceJobConfig string } func NewCommand(f client.Factory) *cobra.Command { @@ -172,9 +172,6 @@ func NewCommand(f client.Factory) *cobra.Command { defaultSnapshotMoveData: false, disableInformerCache: defaultDisableInformerCache, scheduleSkipImmediately: false, - maintenanceCfg: repository.MaintenanceConfig{ - KeepLatestMaitenanceJobs: repository.DefaultKeepLatestMaitenanceJobs, - }, } ) @@ -248,17 +245,20 @@ func NewCommand(f client.Factory) *cobra.Command { command.Flags().BoolVar(&config.defaultSnapshotMoveData, "default-snapshot-move-data", config.defaultSnapshotMoveData, "Move data by default for all snapshots supporting data movement.") command.Flags().BoolVar(&config.disableInformerCache, "disable-informer-cache", config.disableInformerCache, "Disable informer cache for Get calls on restore. With this enabled, it will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is false (don't disable).") command.Flags().BoolVar(&config.scheduleSkipImmediately, "schedule-skip-immediately", config.scheduleSkipImmediately, "Skip the first scheduled backup immediately after creating a schedule. Default is false (don't skip).") - command.Flags().IntVar(&config.maintenanceCfg.KeepLatestMaitenanceJobs, "keep-latest-maintenance-jobs", config.maintenanceCfg.KeepLatestMaitenanceJobs, "Number of latest maintenance jobs to keep each repository. Optional.") - command.Flags().StringVar(&config.maintenanceCfg.CPURequest, "maintenance-job-cpu-request", config.maintenanceCfg.CPURequest, "CPU request for maintenance job. Default is no limit.") - command.Flags().StringVar(&config.maintenanceCfg.MemRequest, "maintenance-job-mem-request", config.maintenanceCfg.MemRequest, "Memory request for maintenance job. Default is no limit.") - command.Flags().StringVar(&config.maintenanceCfg.CPULimit, "maintenance-job-cpu-limit", config.maintenanceCfg.CPULimit, "CPU limit for maintenance job. Default is no limit.") - command.Flags().StringVar(&config.maintenanceCfg.MemLimit, "maintenance-job-mem-limit", config.maintenanceCfg.MemLimit, "Memory limit for maintenance job. Default is no limit.") - command.Flags().StringVar(&config.backukpRepoConfig, "backup-repository-config", config.backukpRepoConfig, "The name of configMap containing backup repository configurations.") + command.Flags().StringVar( + &config.backupRepoConfig, + "backup-repository-config", + config.backupRepoConfig, + "The name of configMap containing backup repository configurations.", + ) + command.Flags().StringVar( + &config.repoMaintenanceJobConfig, + "repo-maintenance-job-config", + config.repoMaintenanceJobConfig, + "The name of ConfigMap containing repository maintenance Job configurations.", + ) - // maintenance job log setting inherited from velero server - config.maintenanceCfg.FormatFlag = config.formatFlag - config.maintenanceCfg.LogLevelFlag = logLevelFlag return command } @@ -667,7 +667,18 @@ func (s *server) initRepoManager() error { s.repoLocker = repository.NewRepoLocker() s.repoEnsurer = repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout) - s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.config.maintenanceCfg, s.logger) + s.repoManager = repository.NewManager( + s.namespace, + s.mgr.GetClient(), + s.repoLocker, + s.repoEnsurer, + s.credentialFileStore, + s.credentialSecretStore, + s.config.repoMaintenanceJobConfig, + s.logger, + s.logLevel, + s.config.formatFlag, + ) return nil } @@ -881,7 +892,14 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string } if _, ok := enabledRuntimeControllers[controller.BackupRepo]; ok { - if err := controller.NewBackupRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.repoMaintenanceFrequency, s.config.backukpRepoConfig, s.repoManager).SetupWithManager(s.mgr); err != nil { + if err := controller.NewBackupRepoReconciler( + s.namespace, + s.logger, + s.mgr.GetClient(), + s.config.repoMaintenanceFrequency, + s.config.backupRepoConfig, + s.repoManager, + ).SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupRepo) } } diff --git a/pkg/controller/backup_repository_controller.go b/pkg/controller/backup_repository_controller.go index 0bc457a1727..4ec0e4e800e 100644 --- a/pkg/controller/backup_repository_controller.go +++ b/pkg/controller/backup_repository_controller.go @@ -55,19 +55,19 @@ type BackupRepoReconciler struct { logger logrus.FieldLogger clock clocks.WithTickerAndDelayedExecution maintenanceFrequency time.Duration - backukpRepoConfig string + backupRepoConfig string repositoryManager repository.Manager } func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client, - maintenanceFrequency time.Duration, backukpRepoConfig string, repositoryManager repository.Manager) *BackupRepoReconciler { + maintenanceFrequency time.Duration, backupRepoConfig string, repositoryManager repository.Manager) *BackupRepoReconciler { c := &BackupRepoReconciler{ client, namespace, logger, clocks.RealClock{}, maintenanceFrequency, - backukpRepoConfig, + backupRepoConfig, repositoryManager, } @@ -229,7 +229,7 @@ func (r *BackupRepoReconciler) getIdentiferByBSL(ctx context.Context, req *veler } func (r *BackupRepoReconciler) initializeRepo(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error { - log.WithField("repoConfig", r.backukpRepoConfig).Info("Initializing backup repository") + log.WithField("repoConfig", r.backupRepoConfig).Info("Initializing backup repository") // confirm the repo's BackupStorageLocation is valid repoIdentifier, err := r.getIdentiferByBSL(ctx, req) @@ -244,7 +244,7 @@ func (r *BackupRepoReconciler) initializeRepo(ctx context.Context, req *velerov1 }) } - config, err := getBackupRepositoryConfig(ctx, r, r.backukpRepoConfig, r.namespace, req.Name, req.Spec.RepositoryType, log) + config, err := getBackupRepositoryConfig(ctx, r, r.backupRepoConfig, r.namespace, req.Name, req.Spec.RepositoryType, log) if err != nil { log.WithError(err).Warn("Failed to get repo config, repo config is ignored") } else if config != nil { diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 22dc59bd537..f1b9decc00d 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned/typed/volumesnapshot/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" @@ -39,8 +40,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned/typed/volumesnapshot/v1" - "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" @@ -49,6 +48,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/nodeagent" + velerotypes "github.com/vmware-tanzu/velero/pkg/types" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/kube" ) @@ -71,15 +71,26 @@ type DataUploadReconciler struct { logger logrus.FieldLogger snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer dataPathMgr *datapath.Manager - loadAffinity *nodeagent.LoadAffinity + loadAffinity *velerotypes.LoadAffinity backupPVCConfig map[string]nodeagent.BackupPVC preparingTimeout time.Duration metrics *metrics.ServerMetrics } -func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, - dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, backupPVCConfig map[string]nodeagent.BackupPVC, clock clocks.WithTickerAndDelayedExecution, - nodeName string, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { +func NewDataUploadReconciler( + client client.Client, + mgr manager.Manager, + kubeClient kubernetes.Interface, + csiSnapshotClient snapshotter.SnapshotV1Interface, + dataPathMgr *datapath.Manager, + loadAffinity *velerotypes.LoadAffinity, + backupPVCConfig map[string]nodeagent.BackupPVC, + clock clocks.WithTickerAndDelayedExecution, + nodeName string, + preparingTimeout time.Duration, + log logrus.FieldLogger, + metrics *metrics.ServerMetrics, +) *DataUploadReconciler { return &DataUploadReconciler{ client: client, mgr: mgr, diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index a6ee25574d5..e73c13b976e 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -231,8 +231,20 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci fakeSnapshotClient := snapshotFake.NewSimpleClientset(vsObject, vscObj) fakeKubeClient := clientgofake.NewSimpleClientset(daemonSet) - return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, map[string]nodeagent.BackupPVC{}, - testclocks.NewFakeClock(now), "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataUploadReconciler( + fakeClient, + nil, + fakeKubeClient, + fakeSnapshotClient.SnapshotV1(), + dataPathMgr, + nil, + map[string]nodeagent.BackupPVC{}, + testclocks.NewFakeClock(now), + "test-node", + time.Minute*5, + velerotest.NewLogger(), + metrics.NewServerMetrics(), + ), nil } func dataUploadBuilder() *builder.DataUploadBuilder { diff --git a/pkg/exposer/csi_snapshot.go b/pkg/exposer/csi_snapshot.go index ccdb70dd281..f2c86d75783 100644 --- a/pkg/exposer/csi_snapshot.go +++ b/pkg/exposer/csi_snapshot.go @@ -34,6 +34,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/pkg/nodeagent" + velerotypes "github.com/vmware-tanzu/velero/pkg/types" "github.com/vmware-tanzu/velero/pkg/util/boolptr" "github.com/vmware-tanzu/velero/pkg/util/csi" "github.com/vmware-tanzu/velero/pkg/util/kube" @@ -66,7 +67,7 @@ type CSISnapshotExposeParam struct { VolumeSize resource.Quantity // Affinity specifies the node affinity of the backup pod - Affinity *nodeagent.LoadAffinity + Affinity *velerotypes.LoadAffinity // BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement BackupPVCConfig map[string]nodeagent.BackupPVC @@ -191,7 +192,14 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje } }() - backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.OperationTimeout, csiExposeParam.HostingPodLabels, csiExposeParam.Affinity) + backupPod, err := e.createBackupPod( + ctx, + ownerObject, + backupPVC, + csiExposeParam.OperationTimeout, + csiExposeParam.HostingPodLabels, + csiExposeParam.Affinity, + ) if err != nil { return errors.Wrap(err, "error to create backup pod") } @@ -422,8 +430,14 @@ func (e *csiSnapshotExposer) createBackupPVC(ctx context.Context, ownerObject co return created, err } -func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, operationTimeout time.Duration, - label map[string]string, affinity *nodeagent.LoadAffinity) (*corev1.Pod, error) { +func (e *csiSnapshotExposer) createBackupPod( + ctx context.Context, + ownerObject corev1.ObjectReference, + backupPVC *corev1.PersistentVolumeClaim, + operationTimeout time.Duration, + label map[string]string, + affinity *velerotypes.LoadAffinity, +) (*corev1.Pod, error) { podName := ownerObject.Name containerName := string(ownerObject.UID) @@ -470,6 +484,11 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co userID := int64(0) + affinityList := make([]*velerotypes.LoadAffinity, 0) + if affinity != nil { + affinityList = append(affinityList, affinity) + } + pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podName, @@ -498,7 +517,7 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co }, }, }, - Affinity: toSystemAffinity(affinity), + Affinity: csi.ToSystemAffinity(affinityList), Containers: []corev1.Container{ { Name: containerName, @@ -527,42 +546,3 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{}) } - -func toSystemAffinity(loadAffinity *nodeagent.LoadAffinity) *corev1.Affinity { - if loadAffinity == nil { - return nil - } - - requirements := []corev1.NodeSelectorRequirement{} - for k, v := range loadAffinity.NodeSelector.MatchLabels { - requirements = append(requirements, corev1.NodeSelectorRequirement{ - Key: k, - Values: []string{v}, - Operator: corev1.NodeSelectorOpIn, - }) - } - - for _, exp := range loadAffinity.NodeSelector.MatchExpressions { - requirements = append(requirements, corev1.NodeSelectorRequirement{ - Key: exp.Key, - Values: exp.Values, - Operator: corev1.NodeSelectorOperator(exp.Operator), - }) - } - - if len(requirements) == 0 { - return nil - } - - return &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: []corev1.NodeSelectorTerm{ - { - MatchExpressions: requirements, - }, - }, - }, - }, - } -} diff --git a/pkg/exposer/csi_snapshot_test.go b/pkg/exposer/csi_snapshot_test.go index 82044dbb8b8..3a63545900f 100644 --- a/pkg/exposer/csi_snapshot_test.go +++ b/pkg/exposer/csi_snapshot_test.go @@ -19,7 +19,6 @@ package exposer import ( "context" "fmt" - "reflect" "testing" "time" @@ -39,7 +38,6 @@ import ( corev1 "k8s.io/api/core/v1" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/nodeagent" velerotest "github.com/vmware-tanzu/velero/pkg/test" "github.com/vmware-tanzu/velero/pkg/util/boolptr" @@ -726,105 +724,6 @@ func TestPeekExpose(t *testing.T) { } } -func TestToSystemAffinity(t *testing.T) { - tests := []struct { - name string - loadAffinity *nodeagent.LoadAffinity - expected *corev1.Affinity - }{ - { - name: "loadAffinity is nil", - }, - { - name: "loadAffinity is empty", - loadAffinity: &nodeagent.LoadAffinity{}, - }, - { - name: "with match label", - loadAffinity: &nodeagent.LoadAffinity{ - NodeSelector: metav1.LabelSelector{ - MatchLabels: map[string]string{ - "key-1": "value-1", - }, - }, - }, - expected: &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Key: "key-1", - Values: []string{"value-1"}, - Operator: corev1.NodeSelectorOpIn, - }, - }, - }, - }, - }, - }, - }, - }, - { - name: "with match expression", - loadAffinity: &nodeagent.LoadAffinity{ - NodeSelector: metav1.LabelSelector{ - MatchLabels: map[string]string{ - "key-2": "value-2", - }, - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "key-3", - Values: []string{"value-3-1", "value-3-2"}, - Operator: metav1.LabelSelectorOpNotIn, - }, - { - Key: "key-4", - Values: []string{"value-4-1", "value-4-2", "value-4-3"}, - Operator: metav1.LabelSelectorOpDoesNotExist, - }, - }, - }, - }, - expected: &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Key: "key-2", - Values: []string{"value-2"}, - Operator: corev1.NodeSelectorOpIn, - }, - { - Key: "key-3", - Values: []string{"value-3-1", "value-3-2"}, - Operator: corev1.NodeSelectorOpNotIn, - }, - { - Key: "key-4", - Values: []string{"value-4-1", "value-4-2", "value-4-3"}, - Operator: corev1.NodeSelectorOpDoesNotExist, - }, - }, - }, - }, - }, - }, - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - affinity := toSystemAffinity(test.loadAffinity) - assert.True(t, reflect.DeepEqual(affinity, test.expected)) - }) - } -} - func Test_csiSnapshotExposer_createBackupPVC(t *testing.T) { backup := &velerov1.Backup{ TypeMeta: metav1.TypeMeta{ diff --git a/pkg/install/deployment.go b/pkg/install/deployment.go index 9ac46ece869..4a1b6cef172 100644 --- a/pkg/install/deployment.go +++ b/pkg/install/deployment.go @@ -27,7 +27,6 @@ import ( "github.com/vmware-tanzu/velero/internal/velero" "github.com/vmware-tanzu/velero/pkg/builder" - "github.com/vmware-tanzu/velero/pkg/repository" ) type podTemplateOption func(*podTemplateConfig) @@ -52,7 +51,6 @@ type podTemplateConfig struct { privilegedNodeAgent bool disableInformerCache bool scheduleSkipImmediately bool - maintenanceConfig repository.MaintenanceConfig } func WithImage(image string) podTemplateOption { @@ -179,12 +177,6 @@ func WithScheduleSkipImmediately(b bool) podTemplateOption { } } -func WithMaintenanceConfig(config repository.MaintenanceConfig) podTemplateOption { - return func(c *podTemplateConfig) { - c.maintenanceConfig = config - } -} - func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment { // TODO: Add support for server args c := &podTemplateConfig{ @@ -242,26 +234,6 @@ func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment args = append(args, fmt.Sprintf("--fs-backup-timeout=%v", c.podVolumeOperationTimeout)) } - if c.maintenanceConfig.KeepLatestMaitenanceJobs > 0 { - args = append(args, fmt.Sprintf("--keep-latest-maintenance-jobs=%d", c.maintenanceConfig.KeepLatestMaitenanceJobs)) - } - - if c.maintenanceConfig.CPULimit != "" { - args = append(args, fmt.Sprintf("--maintenance-job-cpu-limit=%s", c.maintenanceConfig.CPULimit)) - } - - if c.maintenanceConfig.CPURequest != "" { - args = append(args, fmt.Sprintf("--maintenance-job-cpu-request=%s", c.maintenanceConfig.CPURequest)) - } - - if c.maintenanceConfig.MemLimit != "" { - args = append(args, fmt.Sprintf("--maintenance-job-mem-limit=%s", c.maintenanceConfig.MemLimit)) - } - - if c.maintenanceConfig.MemRequest != "" { - args = append(args, fmt.Sprintf("--maintenance-job-mem-request=%s", c.maintenanceConfig.MemRequest)) - } - deployment := &appsv1.Deployment{ ObjectMeta: objectMeta(namespace, "velero"), TypeMeta: metav1.TypeMeta{ diff --git a/pkg/install/deployment_test.go b/pkg/install/deployment_test.go index 426a53df696..8d5f5449868 100644 --- a/pkg/install/deployment_test.go +++ b/pkg/install/deployment_test.go @@ -22,8 +22,6 @@ import ( "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" - - "github.com/vmware-tanzu/velero/pkg/repository" ) func TestDeployment(t *testing.T) { @@ -70,18 +68,4 @@ func TestDeployment(t *testing.T) { deploy = Deployment("velero", WithDisableInformerCache(true)) assert.Len(t, deploy.Spec.Template.Spec.Containers[0].Args, 2) assert.Equal(t, "--disable-informer-cache=true", deploy.Spec.Template.Spec.Containers[0].Args[1]) - - deploy = Deployment("velero", WithMaintenanceConfig(repository.MaintenanceConfig{ - KeepLatestMaitenanceJobs: 3, - CPURequest: "100m", - MemRequest: "256Mi", - CPULimit: "200m", - MemLimit: "512Mi", - })) - assert.Len(t, deploy.Spec.Template.Spec.Containers[0].Args, 6) - assert.Equal(t, "--keep-latest-maintenance-jobs=3", deploy.Spec.Template.Spec.Containers[0].Args[1]) - assert.Equal(t, "--maintenance-job-cpu-limit=200m", deploy.Spec.Template.Spec.Containers[0].Args[2]) - assert.Equal(t, "--maintenance-job-cpu-request=100m", deploy.Spec.Template.Spec.Containers[0].Args[3]) - assert.Equal(t, "--maintenance-job-mem-limit=512Mi", deploy.Spec.Template.Spec.Containers[0].Args[4]) - assert.Equal(t, "--maintenance-job-mem-request=256Mi", deploy.Spec.Template.Spec.Containers[0].Args[5]) } diff --git a/pkg/install/resources.go b/pkg/install/resources.go index 752bc025b9e..9c4ecca8569 100644 --- a/pkg/install/resources.go +++ b/pkg/install/resources.go @@ -350,7 +350,6 @@ func AllResources(o *VeleroOptions) *unstructured.UnstructuredList { WithPodVolumeOperationTimeout(o.PodVolumeOperationTimeout), WithUploaderType(o.UploaderType), WithScheduleSkipImmediately(o.ScheduleSkipImmediately), - WithMaintenanceConfig(o.MaintenanceCfg), } if len(o.Features) > 0 { diff --git a/pkg/nodeagent/node_agent.go b/pkg/nodeagent/node_agent.go index 78cdc27711f..02613b0fff9 100644 --- a/pkg/nodeagent/node_agent.go +++ b/pkg/nodeagent/node_agent.go @@ -29,6 +29,7 @@ import ( "k8s.io/client-go/kubernetes" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/vmware-tanzu/velero/pkg/types" "github.com/vmware-tanzu/velero/pkg/util/kube" ) @@ -75,7 +76,7 @@ type Configs struct { LoadConcurrency *LoadConcurrency `json:"loadConcurrency,omitempty"` // LoadAffinity is the config for data path load affinity. - LoadAffinity []*LoadAffinity `json:"loadAffinity,omitempty"` + LoadAffinity []*types.LoadAffinity `json:"loadAffinity,omitempty"` // BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement BackupPVCConfig map[string]BackupPVC `json:"backupPVC,omitempty"` diff --git a/pkg/repository/maintenance.go b/pkg/repository/maintenance.go index ffe9f89fd42..28a8488774c 100644 --- a/pkg/repository/maintenance.go +++ b/pkg/repository/maintenance.go @@ -18,10 +18,13 @@ package repository import ( "context" + "encoding/json" "fmt" "sort" "time" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" @@ -31,16 +34,17 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/pkg/errors" - + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/repository/provider" + velerotypes "github.com/vmware-tanzu/velero/pkg/types" + csiutil "github.com/vmware-tanzu/velero/pkg/util/csi" "github.com/vmware-tanzu/velero/pkg/util/kube" "github.com/vmware-tanzu/velero/pkg/util/logging" veleroutil "github.com/vmware-tanzu/velero/pkg/util/velero" ) const RepositoryNameLabel = "velero.io/repo-name" -const DefaultKeepLatestMaitenanceJobs = 3 +const DefaultKeepLatestMaintenanceJobs = 3 const DefaultMaintenanceJobCPURequest = "0" const DefaultMaintenanceJobCPULimit = "0" const DefaultMaintenanceJobMemRequest = "0" @@ -55,6 +59,17 @@ type MaintenanceConfig struct { MemLimit string LogLevelFlag *logging.LevelFlag FormatFlag *logging.FormatFlag + Affinity *v1.Affinity +} + +type JobConfigs struct { + // LoadAffinities is the config for repository maintenance job load affinity. + LoadAffinities []*velerotypes.LoadAffinity `json:"loadAffinity,omitempty"` + + CPURequest string `json:"cpuRequest,omitempty"` + MemRequest string `json:"memRequest,omitempty"` + CPULimit string `json:"cpuLimit,omitempty"` + MemLimit string `json:"memLimit,omitempty"` } func generateJobName(repo string) string { @@ -68,7 +83,14 @@ func generateJobName(repo string) string { return jobName } -func buildMaintenanceJob(m MaintenanceConfig, param provider.RepoParam, cli client.Client, namespace string) (*batchv1.Job, error) { +func buildMaintenanceJob( + config *JobConfigs, + param provider.RepoParam, + cli client.Client, + namespace string, + logLevel logrus.Level, + logFormat *logging.FormatFlag, +) (*batchv1.Job, error) { // Get the Velero server deployment deployment := &appsv1.Deployment{} err := cli.Get(context.TODO(), types.NamespacedName{Name: "velero", Namespace: namespace}, deployment) @@ -92,20 +114,25 @@ func buildMaintenanceJob(m MaintenanceConfig, param provider.RepoParam, cli clie image := veleroutil.GetVeleroServerImage(deployment) // Set resource limits and requests - if m.CPURequest == "" { - m.CPURequest = DefaultMaintenanceJobCPURequest - } - if m.MemRequest == "" { - m.MemRequest = DefaultMaintenanceJobMemRequest - } - if m.CPULimit == "" { - m.CPULimit = DefaultMaintenanceJobCPULimit - } - if m.MemLimit == "" { - m.MemLimit = DefaultMaintenanceJobMemLimit + cpuRequest := DefaultMaintenanceJobCPURequest + memRequest := DefaultMaintenanceJobMemRequest + cpuLimit := DefaultMaintenanceJobCPULimit + memLimit := DefaultMaintenanceJobMemLimit + if config != nil { + if config.CPURequest != "" { + cpuRequest = config.CPURequest + } + if config.MemRequest != "" { + memRequest = config.MemRequest + } + if config.CPULimit != "" { + cpuLimit = config.CPULimit + } + if config.MemLimit != "" { + memLimit = config.MemLimit + } } - - resources, err := kube.ParseResourceRequirements(m.CPURequest, m.MemRequest, m.CPULimit, m.MemLimit) + resources, err := kube.ParseResourceRequirements(cpuRequest, memRequest, cpuLimit, memLimit) if err != nil { return nil, errors.Wrap(err, "failed to parse resource requirements for maintenance job") } @@ -115,8 +142,8 @@ func buildMaintenanceJob(m MaintenanceConfig, param provider.RepoParam, cli clie args = append(args, fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace)) args = append(args, fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType)) args = append(args, fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name)) - args = append(args, fmt.Sprintf("--log-level=%s", m.LogLevelFlag.String())) - args = append(args, fmt.Sprintf("--log-format=%s", m.FormatFlag.String())) + args = append(args, fmt.Sprintf("--log-level=%s", logLevel.String())) + args = append(args, fmt.Sprintf("--log-format=%s", logFormat.String())) // build the maintenance job job := &batchv1.Job{ @@ -156,9 +183,14 @@ func buildMaintenanceJob(m MaintenanceConfig, param provider.RepoParam, cli clie }, } - if affinity := veleroutil.GetAffinityFromVeleroServer(deployment); affinity != nil { - job.Spec.Template.Spec.Affinity = affinity + var affinity *v1.Affinity + if config != nil && len(config.LoadAffinities) > 0 { + affinity = csiutil.ToSystemAffinity(config.LoadAffinities) + } + if affinity == nil { + affinity = veleroutil.GetAffinityFromVeleroServer(deployment) } + job.Spec.Template.Spec.Affinity = affinity if tolerations := veleroutil.GetTolerationsFromVeleroServer(deployment); tolerations != nil { job.Spec.Template.Spec.Tolerations = tolerations @@ -262,3 +294,97 @@ func getLatestMaintenanceJob(cli client.Client, ns string) (*batchv1.Job, error) return &jobList.Items[0], nil } + +// getMaintenanceJobConfig is called to get the Maintenance Job Config for the +// BackupRepository specified by the repo parameter. +// +// Params: +// +// veleroNamespace: the Velero-installed namespace. It's used to retrieve the BackupRepository. +// repo: the BackupRepository to get the Job config. +func getMaintenanceJobConfig( + ctx context.Context, + cli client.Client, + logger logrus.FieldLogger, + veleroNamespace string, + repoMaintenanceJobConfig string, + repo *velerov1api.BackupRepository, +) (*JobConfigs, error) { + var cm v1.ConfigMap + if err := cli.Get( + ctx, + types.NamespacedName{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + &cm, + ); err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } else { + return nil, errors.Wrapf( + err, + "fail to get repo maintenance job configs %s", repoMaintenanceJobConfig) + } + } + + if cm.Data == nil { + return nil, errors.Errorf("data is not available in config map %s", repoMaintenanceJobConfig) + } + + configMap := make(map[string]JobConfigs) + for k, v := range cm.Data { + var config JobConfigs + if err := json.Unmarshal([]byte(v), &config); err != nil { + return nil, errors.Wrapf(err, "fail to unmarshal configs from %s", repoMaintenanceJobConfig) + } + configMap[k] = config + } + + // Generate the BackupRepository key. + // If using the BackupRepository name as the is more intuitive, + // but the BackupRepository generation is dynamic. We cannot assume + // they are ready when installing Velero. + // Instead we use the volume source namespace, BSL name, and the uploader + // type to represent the BackupRepository. The combination of those three + // keys can identify a unique BackupRepository. + repoJobConfigKey := repo.Spec.VolumeNamespace + "-" + + repo.Spec.BackupStorageLocation + "-" + repo.Spec.RepositoryType + + var result *JobConfigs + if _, ok := configMap[repoJobConfigKey]; ok { + logger.Debugf("Find the repo maintenance config %s for repo %s", repoJobConfigKey, repo.Name) + result = new(JobConfigs) + *result = configMap[repoJobConfigKey] + } + + if _, ok := configMap["global"]; ok { + logger.Debugf("Find the global repo maintenance config for repo %s", repo.Name) + + if result == nil { + result = new(JobConfigs) + } + + if result.CPULimit == "" { + result.CPULimit = configMap["global"].CPULimit + } + if result.CPURequest == "" { + result.CPURequest = configMap["global"].CPURequest + } + if result.MemLimit == "" { + result.MemLimit = configMap["global"].MemLimit + } + if result.MemRequest == "" { + result.MemRequest = configMap["global"].MemRequest + } + if len(result.LoadAffinities) == 0 { + result.LoadAffinities = configMap["global"].LoadAffinities + } + } + + if result != nil { + logger.Debugf("The content of the Maintenance Job Config: %+v", *result) + } + + return result, nil +} diff --git a/pkg/repository/maintenance_test.go b/pkg/repository/maintenance_test.go index ca88c1577d5..d7cd293bb40 100644 --- a/pkg/repository/maintenance_test.go +++ b/pkg/repository/maintenance_test.go @@ -25,6 +25,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" @@ -36,6 +37,8 @@ import ( velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/repository/provider" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + velerotypes "github.com/vmware-tanzu/velero/pkg/types" "github.com/vmware-tanzu/velero/pkg/util/logging" ) @@ -266,20 +269,20 @@ func TestGetLatestMaintenanceJob(t *testing.T) { func TestBuildMaintenanceJob(t *testing.T) { testCases := []struct { name string - m MaintenanceConfig + m *JobConfigs deploy *appsv1.Deployment + logLevel logrus.Level + logFormat *logging.FormatFlag expectedJobName string expectedError bool }{ { name: "Valid maintenance job", - m: MaintenanceConfig{ - CPURequest: "100m", - MemRequest: "128Mi", - CPULimit: "200m", - MemLimit: "256Mi", - LogLevelFlag: logging.LogLevelFlag(logrus.InfoLevel), - FormatFlag: logging.NewFormatFlag(), + m: &JobConfigs{ + CPURequest: "100m", + MemRequest: "128Mi", + CPULimit: "200m", + MemLimit: "256Mi", }, deploy: &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -299,19 +302,21 @@ func TestBuildMaintenanceJob(t *testing.T) { }, }, }, + logLevel: logrus.InfoLevel, + logFormat: logging.NewFormatFlag(), expectedJobName: "test-123-maintain-job", expectedError: false, }, { name: "Error getting Velero server deployment", - m: MaintenanceConfig{ - CPURequest: "100m", - MemRequest: "128Mi", - CPULimit: "200m", - MemLimit: "256Mi", - LogLevelFlag: logging.LogLevelFlag(logrus.InfoLevel), - FormatFlag: logging.NewFormatFlag(), + m: &JobConfigs{ + CPURequest: "100m", + MemRequest: "128Mi", + CPULimit: "200m", + MemLimit: "256Mi", }, + logLevel: logrus.InfoLevel, + logFormat: logging.NewFormatFlag(), expectedJobName: "", expectedError: true, }, @@ -350,7 +355,7 @@ func TestBuildMaintenanceJob(t *testing.T) { cli := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build() // Call the function to test - job, err := buildMaintenanceJob(tc.m, param, cli, "velero") + job, err := buildMaintenanceJob(tc.m, param, cli, "velero", logrus.InfoLevel, logging.NewFormatFlag()) // Check the error if tc.expectedError { @@ -389,8 +394,8 @@ func TestBuildMaintenanceJob(t *testing.T) { fmt.Sprintf("--repo-name=%s", param.BackupRepo.Spec.VolumeNamespace), fmt.Sprintf("--repo-type=%s", param.BackupRepo.Spec.RepositoryType), fmt.Sprintf("--backup-storage-location=%s", param.BackupLocation.Name), - fmt.Sprintf("--log-level=%s", tc.m.LogLevelFlag.String()), - fmt.Sprintf("--log-format=%s", tc.m.FormatFlag.String()), + fmt.Sprintf("--log-level=%s", tc.logLevel.String()), + fmt.Sprintf("--log-format=%s", tc.logFormat.String()), } assert.Equal(t, expectedArgs, container.Args) @@ -406,3 +411,172 @@ func TestBuildMaintenanceJob(t *testing.T) { }) } } + +func TestGetMaintenanceJobConfig(t *testing.T) { + ctx := context.Background() + logger := logrus.New() + veleroNamespace := "velero" + repoMaintenanceJobConfig := "repo-maintenance-job-config" + repo := &velerov1api.BackupRepository{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + Spec: velerov1api.BackupRepositorySpec{ + BackupStorageLocation: "default", + RepositoryType: "kopia", + VolumeNamespace: "test", + }, + } + + testCases := []struct { + name string + repoJobConfig *v1.ConfigMap + expectedConfig *JobConfigs + expectedError error + }{ + { + name: "Config not exist", + expectedConfig: nil, + expectedError: nil, + }, + { + name: "Invalid JSON", + repoJobConfig: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + Data: map[string]string{ + "test-default-kopia": "{\"cpuRequest:\"100m\"}", + }, + }, + expectedConfig: nil, + expectedError: fmt.Errorf("fail to unmarshal configs from %s", repoMaintenanceJobConfig), + }, + { + name: "Find config specific for BackupRepository", + repoJobConfig: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + Data: map[string]string{ + "test-default-kopia": "{\"cpuRequest\":\"100m\",\"cpuLimit\":\"200m\",\"memRequest\":\"100Mi\",\"memLimit\":\"200Mi\",\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"e2\"]}]}}]}", + }, + }, + expectedConfig: &JobConfigs{ + CPURequest: "100m", + CPULimit: "200m", + MemRequest: "100Mi", + MemLimit: "200Mi", + LoadAffinities: []*velerotypes.LoadAffinity{ + { + NodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "cloud.google.com/machine-family", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"e2"}, + }, + }, + }, + }, + }, + }, + expectedError: nil, + }, + { + name: "Find config specific for global", + repoJobConfig: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + Data: map[string]string{ + "global": "{\"cpuRequest\":\"50m\",\"cpuLimit\":\"100m\",\"memRequest\":\"50Mi\",\"memLimit\":\"100Mi\",\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"n2\"]}]}}]}", + }, + }, + expectedConfig: &JobConfigs{ + CPURequest: "50m", + CPULimit: "100m", + MemRequest: "50Mi", + MemLimit: "100Mi", + LoadAffinities: []*velerotypes.LoadAffinity{ + { + NodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "cloud.google.com/machine-family", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"n2"}, + }, + }, + }, + }, + }, + }, + expectedError: nil, + }, + { + name: "Specific config supersede global config", + repoJobConfig: &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: veleroNamespace, + Name: repoMaintenanceJobConfig, + }, + Data: map[string]string{ + "global": "{\"cpuRequest\":\"50m\",\"cpuLimit\":\"100m\",\"memRequest\":\"50Mi\",\"memLimit\":\"100Mi\",\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"n2\"]}]}}]}", + "test-default-kopia": "{\"cpuRequest\":\"100m\",\"cpuLimit\":\"200m\",\"memRequest\":\"100Mi\",\"memLimit\":\"200Mi\",\"loadAffinity\":[{\"nodeSelector\":{\"matchExpressions\":[{\"key\":\"cloud.google.com/machine-family\",\"operator\":\"In\",\"values\":[\"e2\"]}]}}]}", + }, + }, + expectedConfig: &JobConfigs{ + CPURequest: "100m", + CPULimit: "200m", + MemRequest: "100Mi", + MemLimit: "200Mi", + LoadAffinities: []*velerotypes.LoadAffinity{ + { + NodeSelector: metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "cloud.google.com/machine-family", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"e2"}, + }, + }, + }, + }, + }, + }, + expectedError: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var fakeClient client.Client + if tc.repoJobConfig != nil { + fakeClient = velerotest.NewFakeControllerRuntimeClient(t, tc.repoJobConfig) + } else { + fakeClient = velerotest.NewFakeControllerRuntimeClient(t) + } + + jobConfig, err := getMaintenanceJobConfig( + ctx, + fakeClient, + logger, + veleroNamespace, + repoMaintenanceJobConfig, + repo, + ) + + if tc.expectedError != nil { + require.Contains(t, err.Error(), tc.expectedError.Error()) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.expectedConfig, jobConfig) + }) + } +} diff --git a/pkg/repository/manager.go b/pkg/repository/manager.go index b5875a7355a..828ebf33cb0 100644 --- a/pkg/repository/manager.go +++ b/pkg/repository/manager.go @@ -29,6 +29,7 @@ import ( velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/repository/provider" "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/logging" ) // SnapshotIdentifier uniquely identifies a snapshot @@ -92,14 +93,18 @@ type Manager interface { } type manager struct { - namespace string - providers map[string]provider.Provider - client client.Client - repoLocker *RepoLocker - repoEnsurer *Ensurer - fileSystem filesystem.Interface - maintenanceCfg MaintenanceConfig - log logrus.FieldLogger + namespace string + providers map[string]provider.Provider + // client is the Velero controller manager's client. + // It's limited to resources in the Velero namespace. + client client.Client + repoLocker *RepoLocker + repoEnsurer *Ensurer + fileSystem filesystem.Interface + repoMaintenanceJobConfig string + log logrus.FieldLogger + logLevel logrus.Level + logFormat *logging.FormatFlag } // NewManager create a new repository manager. @@ -110,18 +115,22 @@ func NewManager( repoEnsurer *Ensurer, credentialFileStore credentials.FileStore, credentialSecretStore credentials.SecretStore, - maintenanceCfg MaintenanceConfig, + repoMaintenanceJobConfig string, log logrus.FieldLogger, + logLevel logrus.Level, + logFormat *logging.FormatFlag, ) Manager { mgr := &manager{ - namespace: namespace, - client: client, - providers: map[string]provider.Provider{}, - repoLocker: repoLocker, - repoEnsurer: repoEnsurer, - fileSystem: filesystem.NewFileSystem(), - maintenanceCfg: maintenanceCfg, - log: log, + namespace: namespace, + client: client, + providers: map[string]provider.Provider{}, + repoLocker: repoLocker, + repoEnsurer: repoEnsurer, + fileSystem: filesystem.NewFileSystem(), + repoMaintenanceJobConfig: repoMaintenanceJobConfig, + log: log, + logLevel: logLevel, + logFormat: logFormat, } mgr.providers[velerov1api.BackupRepositoryTypeRestic] = provider.NewResticRepositoryProvider(credentialFileStore, mgr.fileSystem, mgr.log) @@ -204,9 +213,28 @@ func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) error { return nil } + jobConfig, err := getMaintenanceJobConfig( + context.Background(), + m.client, + m.log, + m.namespace, + m.repoMaintenanceJobConfig, + repo, + ) + if err != nil { + log.Infof("Cannot find the repo-maintenance-job-config ConfigMap: %s. Use default value.", err.Error()) + } + log.Info("Start to maintenance repo") - maintenanceJob, err := buildMaintenanceJob(m.maintenanceCfg, param, m.client, m.namespace) + maintenanceJob, err := buildMaintenanceJob( + jobConfig, + param, + m.client, + m.namespace, + m.logLevel, + m.logFormat, + ) if err != nil { return errors.Wrap(err, "error to build maintenance job") } @@ -219,8 +247,11 @@ func (m *manager) PruneRepo(repo *velerov1api.BackupRepository) error { log.Debug("Creating maintenance job") defer func() { - if err := deleteOldMaintenanceJobs(m.client, param.BackupRepo.Name, - m.maintenanceCfg.KeepLatestMaitenanceJobs); err != nil { + if err := deleteOldMaintenanceJobs( + m.client, + param.BackupRepo.Name, + DefaultKeepLatestMaintenanceJobs, + ); err != nil { log.WithError(err).Error("Failed to delete maintenance job") } }() diff --git a/pkg/repository/manager_test.go b/pkg/repository/manager_test.go index 07b51fc7486..fbdfede54e5 100644 --- a/pkg/repository/manager_test.go +++ b/pkg/repository/manager_test.go @@ -19,6 +19,7 @@ package repository import ( "testing" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" kbclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -28,7 +29,7 @@ import ( func TestGetRepositoryProvider(t *testing.T) { var fakeClient kbclient.Client - mgr := NewManager("", fakeClient, nil, nil, nil, nil, MaintenanceConfig{}, nil).(*manager) + mgr := NewManager("", fakeClient, nil, nil, nil, nil, "", nil, logrus.InfoLevel, nil).(*manager) repo := &velerov1.BackupRepository{} // empty repository type diff --git a/pkg/types/common.go b/pkg/types/common.go new file mode 100644 index 00000000000..f295e8eb1ba --- /dev/null +++ b/pkg/types/common.go @@ -0,0 +1,28 @@ +/* +Copyright the Velero contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This package contains the common types and structures shared by +// different packages. +package types + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type LoadAffinity struct { + // NodeSelector specifies the label selector to match nodes + NodeSelector metav1.LabelSelector `json:"nodeSelector"` +} diff --git a/pkg/util/csi/util.go b/pkg/util/csi/util.go index bcc424d1bf9..9b4db9850d0 100644 --- a/pkg/util/csi/util.go +++ b/pkg/util/csi/util.go @@ -19,8 +19,11 @@ package csi import ( "strings" + corev1 "k8s.io/api/core/v1" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/features" + "github.com/vmware-tanzu/velero/pkg/types" ) const ( @@ -30,3 +33,48 @@ const ( func ShouldSkipAction(actionName string) bool { return !features.IsEnabled(velerov1api.CSIFeatureFlag) && strings.Contains(actionName, csiPluginNamePrefix) } + +func ToSystemAffinity(loadAffinities []*types.LoadAffinity) *corev1.Affinity { + if len(loadAffinities) == 0 { + return nil + } + + result := new(corev1.Affinity) + result.NodeAffinity = new(corev1.NodeAffinity) + result.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = new(corev1.NodeSelector) + result.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = + make([]corev1.NodeSelectorTerm, 0) + + for _, loadAffinity := range loadAffinities { + requirements := []corev1.NodeSelectorRequirement{} + for k, v := range loadAffinity.NodeSelector.MatchLabels { + requirements = append(requirements, corev1.NodeSelectorRequirement{ + Key: k, + Values: []string{v}, + Operator: corev1.NodeSelectorOpIn, + }) + } + + for _, exp := range loadAffinity.NodeSelector.MatchExpressions { + requirements = append(requirements, corev1.NodeSelectorRequirement{ + Key: exp.Key, + Values: exp.Values, + Operator: corev1.NodeSelectorOperator(exp.Operator), + }) + } + + if len(requirements) == 0 { + return nil + } + + result.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = + append( + result.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms, + corev1.NodeSelectorTerm{ + MatchExpressions: requirements, + }, + ) + } + + return result +} diff --git a/pkg/util/csi/util_test.go b/pkg/util/csi/util_test.go index 8a3ea3cc1ca..0738440dc1b 100644 --- a/pkg/util/csi/util_test.go +++ b/pkg/util/csi/util_test.go @@ -17,11 +17,16 @@ limitations under the License. package csi import ( + "reflect" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/vmware-tanzu/velero/pkg/features" + "github.com/vmware-tanzu/velero/pkg/types" ) func TestCSIFeatureNotEnabledAndPluginIsFromCSI(t *testing.T) { @@ -33,3 +38,106 @@ func TestCSIFeatureNotEnabledAndPluginIsFromCSI(t *testing.T) { require.True(t, ShouldSkipAction("velero.io/csi-pvc-backupper")) require.False(t, ShouldSkipAction("abc")) } + +func TestToSystemAffinity(t *testing.T) { + tests := []struct { + name string + loadAffinities []*types.LoadAffinity + expected *corev1.Affinity + }{ + { + name: "loadAffinity is nil", + }, + { + name: "loadAffinity is empty", + loadAffinities: []*types.LoadAffinity{}, + }, + { + name: "with match label", + loadAffinities: []*types.LoadAffinity{ + { + NodeSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key-1": "value-1", + }, + }, + }, + }, + expected: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "key-1", + Values: []string{"value-1"}, + Operator: corev1.NodeSelectorOpIn, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "with match expression", + loadAffinities: []*types.LoadAffinity{ + { + NodeSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key-2": "value-2", + }, + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "key-3", + Values: []string{"value-3-1", "value-3-2"}, + Operator: metav1.LabelSelectorOpNotIn, + }, + { + Key: "key-4", + Values: []string{"value-4-1", "value-4-2", "value-4-3"}, + Operator: metav1.LabelSelectorOpDoesNotExist, + }, + }, + }, + }, + }, + expected: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "key-2", + Values: []string{"value-2"}, + Operator: corev1.NodeSelectorOpIn, + }, + { + Key: "key-3", + Values: []string{"value-3-1", "value-3-2"}, + Operator: corev1.NodeSelectorOpNotIn, + }, + { + Key: "key-4", + Values: []string{"value-4-1", "value-4-2", "value-4-3"}, + Operator: corev1.NodeSelectorOpDoesNotExist, + }, + }, + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + affinity := ToSystemAffinity(test.loadAffinities) + assert.True(t, reflect.DeepEqual(affinity, test.expected)) + }) + } +}