Skip to content

Commit

Permalink
Merge pull request #908 from skriss/v0.9.7-cherrypicks
Browse files Browse the repository at this point in the history
V0.9.7 cherrypicks
  • Loading branch information
wwitzel3 authored Oct 4, 2018
2 parents ad61989 + ee00ce4 commit caa962f
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 77 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

#### [v0.9.7](https://github.com/heptio/ark/releases/tag/v0.9.7) - 2018-10-04

#### Bug Fixes
* Preserve explicitly-specified node ports during restore (#712, @timoreimann)
* Enable restoring resources with ownerReference set (#837, @mwieczorek)
* Fix error when restoring ExternalName services (#869, @shubheksha)
* remove restore log helper for accurate line numbers (#891, @skriss)
* Display backup StartTimestamp in `ark backup get` output (#894, @marctc)
* Fix restic restores when using namespace mappings (#900, @skriss)

#### [v0.9.6](https://github.com/heptio/ark/releases/tag/v0.9.6) - 2018-09-21

#### Bug Fixes
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ var defaultResourcePriorities = []string{
"serviceaccounts",
"limitranges",
"pods",
"replicaset",
}

func applyConfigDefaults(c *api.Config, logger logrus.FieldLogger) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/util/output/backup_printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func printBackup(backup *v1.Backup, w io.Writer, options printers.PrintOptions)
status = "Deleting"
}

if _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s", name, status, backup.CreationTimestamp.Time, humanReadableTimeFromNow(expiration), metav1.FormatLabelSelector(backup.Spec.LabelSelector)); err != nil {
if _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s", name, status, backup.Status.StartTimestamp.Time, humanReadableTimeFromNow(expiration), metav1.FormatLabelSelector(backup.Spec.LabelSelector)); err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/restic/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// Restorer can execute restic restores of volumes in a pod.
type Restorer interface {
// RestorePodVolumes restores all annotated volumes in a pod.
RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, log logrus.FieldLogger) []error
RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, sourceNamespace string, log logrus.FieldLogger) []error
}

type restorer struct {
Expand Down Expand Up @@ -84,14 +84,14 @@ func newRestorer(
return r
}

func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, log logrus.FieldLogger) []error {
func (r *restorer) RestorePodVolumes(restore *arkv1api.Restore, pod *corev1api.Pod, sourceNamespace string, log logrus.FieldLogger) []error {
// get volumes to restore from pod's annotations
volumesToRestore := GetPodSnapshotAnnotations(pod)
if len(volumesToRestore) == 0 {
return nil
}

repo, err := r.repoEnsurer.EnsureRepo(r.ctx, restore.Namespace, pod.Namespace)
repo, err := r.repoEnsurer.EnsureRepo(r.ctx, restore.Namespace, sourceNamespace)
if err != nil {
return []error{err}
}
Expand Down
92 changes: 40 additions & 52 deletions pkg/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (kr *kubernetesRestorer) Restore(restore *api.Restore, backup *api.Backup,
restore: restore,
prioritizedResources: prioritizedResources,
selector: selector,
logger: log,
log: log,
dynamicFactory: kr.dynamicFactory,
fileSystem: kr.fileSystem,
namespaceClient: kr.namespaceClient,
Expand Down Expand Up @@ -324,7 +324,7 @@ type context struct {
restore *api.Restore
prioritizedResources []schema.GroupResource
selector labels.Selector
logger logrus.FieldLogger
log logrus.FieldLogger
dynamicFactory client.DynamicFactory
fileSystem FileSystem
namespaceClient corev1.NamespaceInterface
Expand All @@ -338,16 +338,12 @@ type context struct {
pvRestorer PVRestorer
}

func (ctx *context) infof(msg string, args ...interface{}) {
ctx.logger.Infof(msg, args...)
}

func (ctx *context) execute() (api.RestoreResult, api.RestoreResult) {
ctx.infof("Starting restore of backup %s", kube.NamespaceAndName(ctx.backup))
ctx.log.Infof("Starting restore of backup %s", kube.NamespaceAndName(ctx.backup))

dir, err := ctx.unzipAndExtractBackup(ctx.backupReader)
if err != nil {
ctx.infof("error unzipping and extracting: %v", err)
ctx.log.Infof("error unzipping and extracting: %v", err)
return api.RestoreResult{}, api.RestoreResult{Ark: []string{err.Error()}}
}
defer ctx.fileSystem.RemoveAll(dir)
Expand Down Expand Up @@ -452,7 +448,7 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
nsPath := filepath.Join(nsSubDir, nsName)

if !namespaceFilter.ShouldInclude(nsName) {
ctx.infof("Skipping namespace %s", nsName)
ctx.log.Infof("Skipping namespace %s", nsName)
continue
}

Expand All @@ -467,7 +463,7 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
// (in order to get any backed-up metadata), but if we don't find it there,
// create a blank one.
if !existingNamespaces.Has(mappedNsName) {
logger := ctx.logger.WithField("namespace", nsName)
logger := ctx.log.WithField("namespace", nsName)
ns := getNamespace(logger, filepath.Join(dir, api.ResourcesDir, "namespaces", api.ClusterScopedDir, nsName+".json"), mappedNsName)
if _, err := kube.EnsureNamespaceExists(ns, ctx.namespaceClient); err != nil {
addArkError(&errs, err)
Expand All @@ -485,15 +481,15 @@ func (ctx *context) restoreFromDir(dir string) (api.RestoreResult, api.RestoreRe
}

// TODO timeout?
ctx.logger.Debugf("Waiting on resource wait group for resource=%s", resource.String())
ctx.log.Debugf("Waiting on resource wait group for resource=%s", resource.String())
ctx.resourceWaitGroup.Wait()
ctx.logger.Debugf("Done waiting on resource wait group for resource=%s", resource.String())
ctx.log.Debugf("Done waiting on resource wait group for resource=%s", resource.String())
}

// TODO timeout?
ctx.logger.Debug("Waiting on global wait group")
ctx.log.Debug("Waiting on global wait group")
waitErrs := ctx.globalWaitGroup.Wait()
ctx.logger.Debug("Done waiting on global wait group")
ctx.log.Debug("Done waiting on global wait group")

for _, err := range waitErrs {
// TODO not ideal to be adding these to Ark-level errors
Expand Down Expand Up @@ -579,14 +575,14 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
warnings, errs := api.RestoreResult{}, api.RestoreResult{}

if ctx.restore.Spec.IncludeClusterResources != nil && !*ctx.restore.Spec.IncludeClusterResources && namespace == "" {
ctx.infof("Skipping resource %s because it's cluster-scoped", resource)
ctx.log.Infof("Skipping resource %s because it's cluster-scoped", resource)
return warnings, errs
}

if namespace != "" {
ctx.infof("Restoring resource '%s' into namespace '%s' from: %s", resource, namespace, resourcePath)
ctx.log.Infof("Restoring resource '%s' into namespace '%s' from: %s", resource, namespace, resourcePath)
} else {
ctx.infof("Restoring cluster level resource '%s' from: %s", resource, resourcePath)
ctx.log.Infof("Restoring cluster level resource '%s' from: %s", resource, resourcePath)
}

files, err := ctx.fileSystem.ReadDir(resourcePath)
Expand Down Expand Up @@ -631,29 +627,20 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
continue
}

if hasControllerOwner(obj.GetOwnerReferences()) {
// non-pods with controller owners shouldn't be restored; pods with controller
// owners should only be restored if they have restic snapshots to restore
if groupResource != kuberesource.Pods || !restic.PodHasSnapshotAnnotation(obj) {
ctx.infof("%s has a controller owner - skipping", kube.NamespaceAndName(obj))
continue
}
}

complete, err := isCompleted(obj, groupResource)
if err != nil {
addToResult(&errs, namespace, fmt.Errorf("error checking completion %q: %v", fullPath, err))
continue
}
if complete {
ctx.infof("%s is complete - skipping", kube.NamespaceAndName(obj))
ctx.log.Infof("%s is complete - skipping", kube.NamespaceAndName(obj))
continue
}

if resourceClient == nil {
// initialize client for this Resource. we need
// metadata from an object to do this.
ctx.infof("Getting client for %v", obj.GroupVersionKind())
ctx.log.Infof("Getting client for %v", obj.GroupVersionKind())

resource := metav1.APIResource{
Namespaced: len(namespace) > 0,
Expand All @@ -672,15 +659,15 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a

// TODO: move to restore item action if/when we add a ShouldRestore() method to the interface
if groupResource == kuberesource.Pods && obj.GetAnnotations()[v1.MirrorPodAnnotationKey] != "" {
ctx.infof("Not restoring pod because it's a mirror pod")
ctx.log.Infof("Not restoring pod because it's a mirror pod")
continue
}

if groupResource == kuberesource.PersistentVolumes {
_, found := ctx.backup.Status.VolumeBackups[name]
reclaimPolicy, err := collections.GetString(obj.Object, "spec.persistentVolumeReclaimPolicy")
if err == nil && !found && reclaimPolicy == "Delete" {
ctx.infof("Not restoring PV because it doesn't have a snapshot and its reclaim policy is Delete.")
ctx.log.Infof("Not restoring PV because it doesn't have a snapshot and its reclaim policy is Delete.")

ctx.pvsToProvision.Insert(name)

Expand All @@ -706,8 +693,8 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
go func() {
defer ctx.resourceWaitGroup.Done()

if _, err := waitForReady(resourceWatch.ResultChan(), name, isPVReady, time.Minute, ctx.logger); err != nil {
ctx.logger.Warnf("Timeout reached waiting for persistent volume %s to become ready", name)
if _, err := waitForReady(resourceWatch.ResultChan(), name, isPVReady, time.Minute, ctx.log); err != nil {
ctx.log.Warnf("Timeout reached waiting for persistent volume %s to become ready", name)
addArkError(&warnings, fmt.Errorf("timeout reached waiting for persistent volume %s to become ready", name))
}
}()
Expand All @@ -722,7 +709,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
}

if volumeName, exists := spec["volumeName"]; exists && ctx.pvsToProvision.Has(volumeName.(string)) {
ctx.infof("Resetting PersistentVolumeClaim %s/%s for dynamic provisioning because its PV %v has a reclaim policy of Delete", namespace, name, volumeName)
ctx.log.Infof("Resetting PersistentVolumeClaim %s/%s for dynamic provisioning because its PV %v has a reclaim policy of Delete", namespace, name, volumeName)

delete(spec, "volumeName")

Expand All @@ -738,10 +725,10 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
continue
}

ctx.infof("Executing item action for %v", &groupResource)
ctx.log.Infof("Executing item action for %v", &groupResource)

if logSetter, ok := action.ItemAction.(logging.LogSetter); ok {
logSetter.SetLog(ctx.logger)
logSetter.SetLog(ctx.log)
}

updatedObj, warning, err := action.Execute(obj, ctx.restore)
Expand Down Expand Up @@ -770,26 +757,27 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a

// necessary because we may have remapped the namespace
// if the namespace is blank, don't create the key
originalNamespace := obj.GetNamespace()
if namespace != "" {
obj.SetNamespace(namespace)
}

// add an ark-restore label to each resource for easy ID
addLabel(obj, api.RestoreLabelKey, ctx.restore.Name)

ctx.infof("Restoring %s: %v", obj.GroupVersionKind().Kind, name)
ctx.log.Infof("Restoring %s: %v", obj.GroupVersionKind().Kind, name)
createdObj, restoreErr := resourceClient.Create(obj)
if apierrors.IsAlreadyExists(restoreErr) {
fromCluster, err := resourceClient.Get(name, metav1.GetOptions{})
if err != nil {
ctx.infof("Error retrieving cluster version of %s: %v", kube.NamespaceAndName(obj), err)
ctx.log.Infof("Error retrieving cluster version of %s: %v", kube.NamespaceAndName(obj), err)
addToResult(&warnings, namespace, err)
continue
}
// Remove insubstantial metadata
fromCluster, err = resetMetadataAndStatus(fromCluster)
if err != nil {
ctx.infof("Error trying to reset metadata for %s: %v", kube.NamespaceAndName(obj), err)
ctx.log.Infof("Error trying to reset metadata for %s: %v", kube.NamespaceAndName(obj), err)
addToResult(&warnings, namespace, err)
continue
}
Expand All @@ -804,14 +792,14 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
case kuberesource.ServiceAccounts:
desired, err := mergeServiceAccounts(fromCluster, obj)
if err != nil {
ctx.infof("error merging secrets for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
ctx.log.Infof("error merging secrets for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
addToResult(&warnings, namespace, err)
continue
}

patchBytes, err := generatePatch(fromCluster, desired)
if err != nil {
ctx.infof("error generating patch for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
ctx.log.Infof("error generating patch for ServiceAccount %s: %v", kube.NamespaceAndName(obj), err)
addToResult(&warnings, namespace, err)
continue
}
Expand All @@ -825,7 +813,7 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
if err != nil {
addToResult(&warnings, namespace, err)
} else {
ctx.infof("ServiceAccount %s successfully updated", kube.NamespaceAndName(obj))
ctx.log.Infof("ServiceAccount %s successfully updated", kube.NamespaceAndName(obj))
}
default:
e := errors.Errorf("not restored: %s and is different from backed up version.", restoreErr)
Expand All @@ -836,24 +824,24 @@ func (ctx *context) restoreResource(resource, namespace, resourcePath string) (a
}
// Error was something other than an AlreadyExists
if restoreErr != nil {
ctx.infof("error restoring %s: %v", name, err)
ctx.log.Infof("error restoring %s: %v", name, err)
addToResult(&errs, namespace, fmt.Errorf("error restoring %s: %v", fullPath, restoreErr))
continue
}

if groupResource == kuberesource.Pods && len(restic.GetPodSnapshotAnnotations(obj)) > 0 {
if ctx.resticRestorer == nil {
ctx.logger.Warn("No restic restorer, not restoring pod's volumes")
ctx.log.Warn("No restic restorer, not restoring pod's volumes")
} else {
ctx.globalWaitGroup.GoErrorSlice(func() []error {
pod := new(v1.Pod)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdObj.UnstructuredContent(), &pod); err != nil {
ctx.logger.WithError(err).Error("error converting unstructured pod")
ctx.log.WithError(err).Error("error converting unstructured pod")
return []error{err}
}

if errs := ctx.resticRestorer.RestorePodVolumes(ctx.restore, pod, ctx.logger); errs != nil {
ctx.logger.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully complete restic restores of pod's volumes")
if errs := ctx.resticRestorer.RestorePodVolumes(ctx.restore, pod, originalNamespace, ctx.log); errs != nil {
ctx.log.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully complete restic restores of pod's volumes")
return errs
}

Expand Down Expand Up @@ -1084,7 +1072,7 @@ func (ctx *context) unmarshal(filePath string) (*unstructured.Unstructured, erro
func (ctx *context) unzipAndExtractBackup(src io.Reader) (string, error) {
gzr, err := gzip.NewReader(src)
if err != nil {
ctx.infof("error creating gzip reader: %v", err)
ctx.log.Infof("error creating gzip reader: %v", err)
return "", err
}
defer gzr.Close()
Expand All @@ -1097,7 +1085,7 @@ func (ctx *context) unzipAndExtractBackup(src io.Reader) (string, error) {
func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
dir, err := ctx.fileSystem.TempDir("", "")
if err != nil {
ctx.infof("error creating temp dir: %v", err)
ctx.log.Infof("error creating temp dir: %v", err)
return "", err
}

Expand All @@ -1108,7 +1096,7 @@ func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
break
}
if err != nil {
ctx.infof("error reading tar: %v", err)
ctx.log.Infof("error reading tar: %v", err)
return "", err
}

Expand All @@ -1118,15 +1106,15 @@ func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
case tar.TypeDir:
err := ctx.fileSystem.MkdirAll(target, header.FileInfo().Mode())
if err != nil {
ctx.infof("mkdirall error: %v", err)
ctx.log.Infof("mkdirall error: %v", err)
return "", err
}

case tar.TypeReg:
// make sure we have the directory created
err := ctx.fileSystem.MkdirAll(filepath.Dir(target), header.FileInfo().Mode())
if err != nil {
ctx.infof("mkdirall error: %v", err)
ctx.log.Infof("mkdirall error: %v", err)
return "", err
}

Expand All @@ -1138,7 +1126,7 @@ func (ctx *context) readBackup(tarRdr *tar.Reader) (string, error) {
defer file.Close()

if _, err := io.Copy(file, tarRdr); err != nil {
ctx.infof("error copying: %v", err)
ctx.log.Infof("error copying: %v", err)
return "", err
}
}
Expand Down
Loading

0 comments on commit caa962f

Please sign in to comment.