Skip to content

Commit

Permalink
Merge pull request #407 from Xieql/init-restore
Browse files Browse the repository at this point in the history
backup: init restore controller
  • Loading branch information
kurator-bot authored Oct 23, 2023
2 parents 051d3c5 + 972a0db commit d08adb9
Show file tree
Hide file tree
Showing 8 changed files with 587 additions and 7 deletions.
146 changes: 146 additions & 0 deletions pkg/fleet-manager/backup_restore_migrate_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package fleet

import (
"context"
"errors"
"fmt"
"reflect"
"sort"
Expand Down Expand Up @@ -311,3 +312,148 @@ func listResourcesFromClusterClient(ctx context.Context, namespace string, label
}
return clusterClient.List(ctx, objList, opts)
}

// fetchRestoreDestinationClusters retrieves the destination clusters for a restore operation.
// It first fetches the referred backup and then determines the destination clusters based on the restore and backup specifications:
// If the restore destination is not set, it returns the clusters from the backup destination.
// If set, it ensures that the restore destination is a subset of the backup destination.
//
// Returns:
// - string: The name of the fleet where the restore's set of fleetClusters resides.
// - map[ClusterKey]*fleetCluster: A map of cluster keys to fleet clusters.
// - error: An error object indicating any issues encountered during the operation.
func (r *RestoreManager) fetchRestoreDestinationClusters(ctx context.Context, restore *backupapi.Restore) (string, map[ClusterKey]*fleetCluster, error) {
// Retrieve the referred backup in the current Kurator host cluster
key := client.ObjectKey{
Name: restore.Spec.BackupName,
Namespace: restore.Namespace,
}
referredBackup := &backupapi.Backup{}
if err := r.Client.Get(ctx, key, referredBackup); err != nil {
return "", nil, fmt.Errorf("failed to retrieve the referred backup '%s': %w", restore.Spec.BackupName, err)
}

// Get the base clusters from the referred backup
baseClusters, err := fetchDestinationClusters(ctx, r.Client, restore.Namespace, referredBackup.Spec.Destination)
if err != nil {
return "", nil, fmt.Errorf("failed to fetch fleet clusters for the backup '%s': %w", referredBackup.Name, err)
}

// If the restore destination is not set, return the base fleet clusters directly
if restore.Spec.Destination == nil {
return referredBackup.Spec.Destination.Fleet, baseClusters, nil
}

// If the restore destination is set, try to get the clusters from the restore destination
restoreClusters, err := fetchDestinationClusters(ctx, r.Client, restore.Namespace, *restore.Spec.Destination)
if err != nil {
return "", nil, fmt.Errorf("failed to fetch restore clusters for the restore '%s': %w", restore.Name, err)
}

// Check the fleet and clusters between the restore and the referred backup
if referredBackup.Spec.Destination.Fleet != restore.Spec.Destination.Fleet {
// if we make sure only one fleet in one ns, this error will never happen
return "", nil, errors.New("the restore destination fleet must be the same as the backup's")
}

// In our design, the restore destination must be a subset of the backup destination.
if !isFleetClusterSubset(baseClusters, restoreClusters) {
return "", nil, errors.New("the restore clusters must be a subset of the base clusters")
}

return restore.Spec.Destination.Fleet, restoreClusters, nil
}

// isFleetClusterSubset is the helper function to check if one set of clusters is a subset of another
func isFleetClusterSubset(baseClusters, subsetClusters map[ClusterKey]*fleetCluster) bool {
for key := range subsetClusters {
if _, exists := baseClusters[key]; !exists {
return false
}
}
return true
}

// buildVeleroScheduleInstance constructs a Velero Restore instance configured to Restore operations on the specified cluster.
func buildVeleroRestoreInstance(restoreSpec *backupapi.RestoreSpec, labels map[string]string, veleroBackupName, veleroRestoreName string) *velerov1.Restore {
veleroRestore := &velerov1.Restore{
ObjectMeta: generateVeleroResourceObjectMeta(veleroRestoreName, labels),
Spec: velerov1.RestoreSpec{
BackupName: veleroBackupName,
},
}
if restoreSpec.Policy != nil {
veleroRestore.Spec.NamespaceMapping = restoreSpec.Policy.NamespaceMapping
veleroRestore.Spec.PreserveNodePorts = restoreSpec.Policy.PreserveNodePorts
// in velero, the restore does not contain namespace scope filter
if restoreSpec.Policy.ResourceFilter != nil {
veleroRestore.Spec.IncludedNamespaces = restoreSpec.Policy.ResourceFilter.IncludedNamespaces
veleroRestore.Spec.ExcludedNamespaces = restoreSpec.Policy.ResourceFilter.ExcludedNamespaces
veleroRestore.Spec.IncludedResources = restoreSpec.Policy.ResourceFilter.IncludedResources
veleroRestore.Spec.ExcludedResources = restoreSpec.Policy.ResourceFilter.ExcludedResources
veleroRestore.Spec.IncludeClusterResources = restoreSpec.Policy.ResourceFilter.IncludeClusterResources
veleroRestore.Spec.LabelSelector = restoreSpec.Policy.ResourceFilter.LabelSelector
veleroRestore.Spec.OrLabelSelectors = restoreSpec.Policy.ResourceFilter.OrLabelSelectors
}
if restoreSpec.Policy.PreserveStatus != nil {
veleroRestore.Spec.RestoreStatus = &velerov1.RestoreStatusSpec{
IncludedResources: restoreSpec.Policy.PreserveStatus.IncludedResources,
ExcludedResources: restoreSpec.Policy.PreserveStatus.ExcludedResources,
}
}
}

return veleroRestore
}

// allRestoreCompleted checks if all restore operations are completed by inspecting the phase of each RestoreDetails instance in the provided slice.
func allRestoreCompleted(clusterDetails []*backupapi.RestoreDetails) bool {
for _, detail := range clusterDetails {
if detail.RestoreStatusInCluster == nil || detail.RestoreStatusInCluster.Phase != velerov1.RestorePhaseCompleted {
return false
}
}
return true
}

// syncVeleroRestoreStatus synchronizes the status of Velero restore resources across different clusters.
// Note: Returns the modified ClusterDetails to capture internal changes due to Go's slice behavior.
func syncVeleroRestoreStatus(ctx context.Context, destinationClusters map[ClusterKey]*fleetCluster, clusterDetails []*backupapi.RestoreDetails, creatorKind, creatorNamespace, creatorName string) ([]*backupapi.RestoreDetails, error) {
log := ctrl.LoggerFrom(ctx)

if clusterDetails == nil {
clusterDetails = []*backupapi.RestoreDetails{}
}

// Initialize a map to store the velero restore status of each cluster currently recorded. The combination of detail.ClusterName, detail.ClusterKind, and detail.BackupNameInCluster uniquely identifies a Velero restore object.
statusMap := make(map[string]*backupapi.RestoreDetails)
for _, detail := range clusterDetails {
key := fmt.Sprintf("%s-%s-%s", detail.ClusterName, detail.ClusterKind, detail.RestoreNameInCluster)
statusMap[key] = detail
}
// Loop through each target cluster to retrieve the status of Velero restore resources using the client associated with the respective target cluster.
for clusterKey, clusterAccess := range destinationClusters {
name := generateVeleroResourceName(clusterKey.Name, creatorKind, creatorNamespace, creatorName)
veleroRestore := &velerov1.Restore{}
err := getResourceFromClusterClient(ctx, name, VeleroNamespace, *clusterAccess, veleroRestore)
if err != nil {
log.Error(err, "failed to get velero restore instance for sync status", "restoreName", name)
return nil, err
}

key := fmt.Sprintf("%s-%s-%s", clusterKey.Name, clusterKey.Kind, veleroRestore.Name)
if detail, exists := statusMap[key]; exists {
detail.RestoreStatusInCluster = &veleroRestore.Status
} else {
currentRestoreDetails := &backupapi.RestoreDetails{
ClusterName: clusterKey.Name,
ClusterKind: clusterKey.Kind,
RestoreNameInCluster: veleroRestore.Name,
RestoreStatusInCluster: &veleroRestore.Status,
}
clusterDetails = append(clusterDetails, currentRestoreDetails)
}
}

return clusterDetails, nil
}
Loading

0 comments on commit d08adb9

Please sign in to comment.