Skip to content

Commit

Permalink
Merge pull request #15 from replicatedhq/laverya/implement-swap-defau…
Browse files Browse the repository at this point in the history
…lt-storageclass-functionality

implement flag to allow changing the default StorageClass after migration completion
  • Loading branch information
laverya authored Jul 23, 2021
2 parents 062451d + 5fe5e17 commit c261b79
Show file tree
Hide file tree
Showing 2 changed files with 488 additions and 6 deletions.
126 changes: 120 additions & 6 deletions pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package migrate
import (
"bufio"
"context"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"strconv"
"strings"
"text/tabwriter"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -29,6 +29,10 @@ const sourceNsAnnotation = baseAnnotation + "-sourcens"
const sourcePvcAnnotation = baseAnnotation + "-sourcepvc"
const desiredReclaimAnnotation = baseAnnotation + "-reclaim"

// IsDefaultStorageClassAnnotation - this is also exported by https://github.com/kubernetes/kubernetes/blob/v1.21.3/pkg/apis/storage/v1/util/helpers.go#L25
// but that would require adding the k8s import overrides to our go.mod
const IsDefaultStorageClassAnnotation = "storageclass.kubernetes.io/is-default-class"

// Cli uses CLI options to run Migrate
func Cli() {
var sourceSCName string
Expand Down Expand Up @@ -104,10 +108,83 @@ func Migrate(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
return fmt.Errorf("failed to scale up pods: %w", err)
}

if setDefaults {
err = swapDefaultStorageClasses(ctx, w, clientset, sourceSCName, destSCName)
if err != nil {
return fmt.Errorf("failed to change default StorageClass from %s to %s: %w", sourceSCName, destSCName, err)
}
}

w.Printf("\nSuccess!\n")
return nil
}

// swapDefaultStorageClasses attempts to set newDefaultSC as the default StorageClass
// if oldDefaultSC was set as the default, then it will be unset first
// if another StorageClass besides these two is currently the default, it will return an error
func swapDefaultStorageClasses(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, oldDefaultSC string, newDefaultSC string) error {
// check if any SC is currently set as default - if none is, skip the "remove existing default" step
scs, err := clientset.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list storageclasses: %w", err)
}

isDefaultSet := false
for _, sc := range scs.Items {
if sc.Annotations == nil {
continue
}
val, ok := sc.Annotations[IsDefaultStorageClassAnnotation]
if !ok || val != "true" {
continue
}
if sc.Name == newDefaultSC {
return nil // the currently default StorageClass is the one we want to be set as default, nothing left to do
}
if sc.Name != oldDefaultSC {
return fmt.Errorf("%s is not the default StorageClass", oldDefaultSC)
}
isDefaultSet = true
}

if isDefaultSet { // only unset the current default StorageClass if there is currently a default StorageClass
w.Printf("\nChanging default StorageClass from %s to %s\n", oldDefaultSC, newDefaultSC)
err = mutateSC(ctx, w, clientset, oldDefaultSC, func(sc *storagev1.StorageClass) *storagev1.StorageClass {
delete(sc.Annotations, IsDefaultStorageClassAnnotation)
return sc
}, func(sc *storagev1.StorageClass) bool {
_, ok := sc.Annotations[IsDefaultStorageClassAnnotation]
return !ok
})
if err != nil {
return fmt.Errorf("failed to unset StorageClass %s as default: %w", oldDefaultSC, err)
}
} else {
w.Printf("\nSetting %s as the default StorageClass\n", newDefaultSC)
}

err = mutateSC(ctx, w, clientset, newDefaultSC, func(sc *storagev1.StorageClass) *storagev1.StorageClass {
if sc.Annotations == nil {
sc.Annotations = map[string]string{IsDefaultStorageClassAnnotation: "true"}
} else {
sc.Annotations[IsDefaultStorageClassAnnotation] = "true"
}
return sc
}, func(sc *storagev1.StorageClass) bool {
if sc.Annotations == nil {
return false
}
val, ok := sc.Annotations[IsDefaultStorageClassAnnotation]
return ok && val == "true"
})
if err != nil {
return fmt.Errorf("failed to set StorageClass %s as default: %w", newDefaultSC, err)
}

w.Printf("Finished changing default StorageClass\n")
return nil
}

func copyAllPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName string, destSCName string, rsyncImage string, matchingPVCs map[string][]corev1.PersistentVolumeClaim, verboseCopy bool) error {
// create a pod for each PVC migration, and wait for it to finish
w.Printf("\nCopying data from %s PVCs to %s PVCs\n", sourceSCName, destSCName)
Expand Down Expand Up @@ -440,10 +517,6 @@ func newPvcName(originalName string) string {
return originalName + "-pvcmigrate"
}

func originalPvcName(newName string) string {
return strings.TrimSuffix(newName, "-pvcmigrate")
}

// get a PV, apply the selected mutator to the PV, update the PV, use the supplied validator to wait for the update to show up
func mutatePV(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, pvName string, mutator func(volume *corev1.PersistentVolume) *corev1.PersistentVolume, checker func(volume *corev1.PersistentVolume) bool) error {
tries := 0
Expand Down Expand Up @@ -485,6 +558,47 @@ func mutatePV(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
}
}

// get a SC, apply the selected mutator to the SC, update the SC, use the supplied validator to wait for the update to show up
func mutateSC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, scName string, mutator func(sc *storagev1.StorageClass) *storagev1.StorageClass, checker func(sc *storagev1.StorageClass) bool) error {
tries := 0
for {
sc, err := clientset.StorageV1().StorageClasses().Get(ctx, scName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get storage classes %s: %w", scName, err)
}

sc = mutator(sc)

_, err = clientset.StorageV1().StorageClasses().Update(ctx, sc, metav1.UpdateOptions{})
if err != nil {
if k8serrors.IsConflict(err) {
if tries > 5 {
return fmt.Errorf("failed to mutate SC %s: %w", scName, err)
}
w.Printf("Got conflict updating SC %s, waiting 5s to retry\n", scName)
time.Sleep(time.Second * 5)
tries++
continue
} else {
return fmt.Errorf("failed to mutate SC %s: %w", scName, err)
}
}
break
}

for {
sc, err := clientset.StorageV1().StorageClasses().Get(ctx, scName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get storage classes %s: %w", scName, err)
}

if checker(sc) {
return nil
}
time.Sleep(time.Second * 5)
}
}

// scaleDownPods scales down statefulsets & deployments controlling pods mounting PVCs in a supplied list
// it will also cleanup WIP migration pods it discovers that happen to be mounting a supplied PVC.
// if a pod is not created by pvmigrate, and is not controlled by a statefulset/deployment, this function will return an error.
Expand Down
Loading

0 comments on commit c261b79

Please sign in to comment.