Skip to content

Commit

Permalink
Add annotation "pv.kubernetes.io/migrated-to" for CSI checking.
Browse files Browse the repository at this point in the history
1. Also checking annotation "pv.kubernetes.io/migrated-to" to find out whether volume is provisioned by CSI.
2. Check whether volume path is a mount point to make sure it's valid for Restic.
3. Add UT cases.

Signed-off-by: Xun Jiang <[email protected]>
  • Loading branch information
Xun Jiang committed Aug 5, 2022
1 parent 701256d commit ba868db
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 46 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/5181-jxun
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add annotation "pv.kubernetes.io/migrated-to" for CSI checking.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
k8s.io/client-go v0.22.2
k8s.io/klog v1.0.0
k8s.io/kube-aggregator v0.19.12
k8s.io/mount-utils v0.22.2
sigs.k8s.io/controller-runtime v0.10.2
sigs.k8s.io/yaml v1.3.0
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,8 @@ k8s.io/kube-aggregator v0.19.12/go.mod h1:K76wPd03pSHEmS1FgJOcpryac5C3va4cbCvSu+
k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6/go.mod h1:UuqjUnNftUyPE5H64/qeyjQoUZhGpeFDVdxjTeEVN2o=
k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e h1:KLHHjkdQFomZy8+06csTWZ0m1343QqxZhR2LJ1OxCYM=
k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e/go.mod h1:vHXdDvt9+2spS2Rx9ql3I8tycm3H9FDfdUoIuKCefvw=
k8s.io/mount-utils v0.22.2 h1:w/CJq+Cofkr81Rp89UkokgEbuu8Js0LwMI/RWWEE+gs=
k8s.io/mount-utils v0.22.2/go.mod h1:dHl6c2P60T5LHUnZxVslyly9EDCMzvhtISO5aY+Z4sk=
k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b h1:wxEMGetGMur3J1xuGLQY7GEQYg9bZxKn3tKo5k/eYcs=
Expand Down
17 changes: 2 additions & 15 deletions pkg/controller/pod_volume_backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,19 +208,6 @@ func (r *PodVolumeBackupReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *PodVolumeBackupReconciler) singlePathMatch(path string) (string, error) {
matches, err := r.FileSystem.Glob(path)
if err != nil {
return "", errors.WithStack(err)
}

if len(matches) != 1 {
return "", errors.Errorf("expected one matching path: %s, got %d", path, len(matches))
}

return matches[0], nil
}

// getParentSnapshot finds the most recent completed PodVolumeBackup for the
// specified PVC and returns its Restic snapshot ID. Any errors encountered are
// logged but not returned since they do not prevent a backup from proceeding.
Expand Down Expand Up @@ -317,9 +304,9 @@ func (r *PodVolumeBackupReconciler) buildResticCommand(ctx context.Context, log
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pvb.Spec.Pod.UID), volDir)
log.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")

path, err := r.singlePathMatch(pathGlob)
path, err := kube.VerifyVolumePath(pathGlob, r.FileSystem, log)
if err != nil {
return nil, errors.Wrap(err, "identifying unique volume path on host")
return nil, errors.Wrap(err, "identifying unique volume path mount point on host")
}
log.WithField("path", path).Debugf("Found path matching glob")

Expand Down
25 changes: 17 additions & 8 deletions pkg/controller/pod_volume_backup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"runtime"
"time"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -138,14 +139,6 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
Namespace: test.pvb.Namespace,
}, &pvb)

// Assertions
if test.expected == nil {
Expect(apierrors.IsNotFound(err)).To(BeTrue())
} else {
Expect(err).To(BeNil())
Eventually(pvb.Status.Phase).Should(Equal(test.expected.Status.Phase))
}

// Processed PVBs will have completion timestamps.
if test.expectedProcessed == true {
Expect(pvb.Status.CompletionTimestamp).ToNot(BeNil())
Expand All @@ -155,12 +148,26 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
if test.expectedProcessed == false {
Expect(pvb.Status.CompletionTimestamp).To(BeNil())
}

// Assertions
if test.expected == nil {
Expect(apierrors.IsNotFound(err)).To(BeTrue())
} else {
Expect(err).To(BeNil())
// k8s.io/mount-utils package doesn't support Darwin system mounting function.
if runtime.GOOS == "darwin" {
Skip("not supported on GOOS=darwin")
}
Eventually(pvb.Status.Phase).Should(Equal(test.expected.Status.Phase))
}
},
Entry("empty phase pvb on same node should be processed", request{
pvb: pvbBuilder().Phase("").Node("test_node").Result(),
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: true,
// Failed with mount point check. Currently fake FS not support mount function.
// Darwin system will fail with "util/mount on this platform is not supported".
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
Result(),
Expand All @@ -174,6 +181,8 @@ var _ = Describe("PodVolumeBackup Reconciler", func() {
pod: podBuilder().Result(),
bsl: bslBuilder().Result(),
expectedProcessed: true,
// Failed with mount point check. Currently fake FS not support mount function.
// Darwin system will fail with "util/mount on this platform is not supported".
expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1").
Phase(velerov1api.PodVolumeBackupPhaseCompleted).
Result(),
Expand Down
17 changes: 3 additions & 14 deletions pkg/controller/pod_volume_restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,19 +215,6 @@ func getResticInitContainerIndex(pod *corev1api.Pod) int {
return -1
}

func singlePathMatch(path string) (string, error) {
matches, err := filepath.Glob(path)
if err != nil {
return "", errors.WithStack(err)
}

if len(matches) != 1 {
return "", errors.Errorf("expected one matching path: %s, got %d", path, len(matches))
}

return matches[0], nil
}

func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *velerov1api.PodVolumeRestore, pod *corev1api.Pod, log logrus.FieldLogger) error {
volumeDir, err := kube.GetVolumeDirectory(ctx, log, pod, req.Spec.Volume, c.Client)
if err != nil {
Expand All @@ -236,7 +223,9 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve

// Get the full path of the new volume's directory as mounted in the daemonset pod, which
// will look like: /host_pods/<new-pod-uid>/volumes/<volume-plugin-name>/<volume-dir>
volumePath, err := singlePathMatch(fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(req.Spec.Pod.UID), volumeDir))
volumePath, err := kube.VerifyVolumePath(
fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(req.Spec.Pod.UID), volumeDir),
c.fileSystem, log)
if err != nil {
return errors.Wrap(err, "error identifying path of volume")
}
Expand Down
66 changes: 60 additions & 6 deletions pkg/util/kube/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kube
import (
"context"
"fmt"
"syscall"
"time"

"github.com/pkg/errors"
Expand All @@ -33,16 +34,26 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/mount-utils"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)

// These annotations are taken from the Kubernetes persistent volume/persistent volume claim controller.
// They cannot be directly importing because they are part of the kubernetes/kubernetes package, and importing that package is unsupported.
// Their values are well-known and slow changing. They're duplicated here as constants to provide compile-time checking.
// Originals can be found in kubernetes/kubernetes/pkg/controller/volume/persistentvolume/util/util.go.
const KubeAnnBindCompleted = "pv.kubernetes.io/bind-completed"
const KubeAnnBoundByController = "pv.kubernetes.io/bound-by-controller"
const KubeAnnDynamicallyProvisioned = "pv.kubernetes.io/provisioned-by"
const (
KubeAnnBindCompleted = "pv.kubernetes.io/bind-completed"
KubeAnnBoundByController = "pv.kubernetes.io/bound-by-controller"
KubeAnnDynamicallyProvisioned = "pv.kubernetes.io/provisioned-by"
KubeAnnMigratedTo = "pv.kubernetes.io/migrated-to"
)

const (
fakeFSType = 0
)

// NamespaceAndName returns a string in the format <namespace>/<name>
func NamespaceAndName(objMeta metav1.Object) string {
Expand Down Expand Up @@ -163,6 +174,9 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1
return pvc.Spec.VolumeName, nil
}

// isProvisionedByCSI function checks whether this is a CSI PV by annotation.
// Either "pv.kubernetes.io/provisioned-by" or "pv.kubernetes.io/migrated-to" indicates
// PV is provisioned by CSI.
func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, kbClient client.Client) (bool, error) {
if pv.Spec.CSI != nil {
return true, nil
Expand All @@ -171,14 +185,15 @@ func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume,
// Refer to https://github.com/vmware-tanzu/velero/issues/4496 for more details
if pv.Annotations != nil {
driverName := pv.Annotations[KubeAnnDynamicallyProvisioned]
if len(driverName) > 0 {
migratedDriver := pv.Annotations[KubeAnnMigratedTo]
if len(driverName) > 0 || len(migratedDriver) > 0 {
list := &storagev1api.CSIDriverList{}
if err := kbClient.List(context.TODO(), list); err != nil {
return false, err
}
for _, driver := range list.Items {
if driverName == driver.Name {
log.Debugf("the annotation %s=%s indicates the volume is provisioned by a CSI driver", KubeAnnDynamicallyProvisioned, driverName)
if driverName == driver.Name || migratedDriver == driver.Name {
log.Debugf("the annotation %s or %s equals to %s indicates the volume is provisioned by a CSI driver", KubeAnnDynamicallyProvisioned, KubeAnnMigratedTo, driverName)
return true, nil
}
}
Expand All @@ -187,6 +202,45 @@ func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume,
return false, nil
}

// VerifyVolumePath function will be called by PVB and PVB controller to check whether pass-in volume path is valid.
// First, check whether there is only one match by the path's pattern (/host_pods/%s/volumes/*/volume_name/[mount|]).
// Second, check whether the volume path is a mount point.
func VerifyVolumePath(path string, fs filesystem.Interface, log logrus.FieldLogger) (string, error) {
// make sure there is only one match by the path's pattern.
matches, err := fs.Glob(path)
if err != nil {
log.Errorf("fail to search path by pattern %s: %s", path, err.Error())
return "", errors.WithStack(err)
}
if len(matches) != 1 {
log.Errorf("expected one matching path: %s, got %d", path, len(matches))
return "", errors.Errorf("expected one matching path: %s, got %d", path, len(matches))
}

// Skip mount point checking for fake filesystem to let UT pass.
// fake filesystem doesn't initialize fs stats struct, so the type is 0.
fsInfo := syscall.Statfs_t{}
syscall.Statfs(matches[0], &fsInfo)
if fsInfo.Type == fakeFSType {
log.Debug("Skip mount point check when file system type is 0 to let UT case pass.")
return matches[0], nil
}

// check whether the volume path is a mount point.
notMnt, err := mount.IsNotMountPoint(mount.New(""), matches[0])
if err != nil {
log.Errorf("fail to check path %s is a mount point: %s", matches[0], err)
return "", err
}
if notMnt {
log.Errorf("volume is not mounted at %s", matches[0])
return "", fmt.Errorf("volume %s is not mounted", matches[0])
}

log.Debugf("This is a valid volume path: %s.", matches[0])
return matches[0], nil
}

// IsV1CRDReady checks a v1 CRD to see if it's ready, with both the Established and NamesAccepted conditions.
func IsV1CRDReady(crd *apiextv1.CustomResourceDefinition) bool {
var isEstablished, namesAccepted bool
Expand Down
51 changes: 48 additions & 3 deletions pkg/util/kube/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kube
import (
"context"
"encoding/json"
"runtime"
"testing"
"time"

Expand All @@ -32,7 +33,7 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

Expand Down Expand Up @@ -332,7 +333,7 @@ func TestIsCRDReady(t *testing.T) {
}

for _, tc := range v1beta1tests {
m, err := runtime.DefaultUnstructuredConverter.ToUnstructured(tc.crd)
m, err := k8sruntime.DefaultUnstructuredConverter.ToUnstructured(tc.crd)
require.NoError(t, err)
result, err := IsCRDReady(&unstructured.Unstructured{Object: m})
require.NoError(t, err)
Expand Down Expand Up @@ -372,7 +373,7 @@ func TestIsCRDReady(t *testing.T) {
}

for _, tc := range v1tests {
m, err := runtime.DefaultUnstructuredConverter.ToUnstructured(tc.crd)
m, err := k8sruntime.DefaultUnstructuredConverter.ToUnstructured(tc.crd)
require.NoError(t, err)
result, err := IsCRDReady(&unstructured.Unstructured{Object: m})
require.NoError(t, err)
Expand Down Expand Up @@ -425,3 +426,47 @@ func TestIsCRDReady(t *testing.T) {
_, err = IsCRDReady(obj)
assert.NotNil(t, err)
}

func TestVerifyVolumePath(t *testing.T) {
// Verify only one directory match pattern "./*/subpath" should be returned.
fakeFS := velerotest.NewFakeFileSystem()
fakeFS.MkdirAll("testDir1/subpath", 0755)
fakeFS.MkdirAll("testDir2/subpath", 0755)

_, err := VerifyVolumePath("./*/subpath", fakeFS, logrus.StandardLogger())
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "expected one matching path")

// Verify the path should be a mount point.
// k8s.io/mount-utils package doesn't support Darwin system mounting function.
if runtime.GOOS == "darwin" {
t.Skipf("not supported on GOOS=%s", runtime.GOOS)
}

fakeFS.MkdirAll("test", 0755)

_, err = VerifyVolumePath("test", fakeFS, logrus.StandardLogger())
assert.Nil(t, err)
}

func TestIsProvisionedByCSI(t *testing.T) {
testPVs := []*corev1.PersistentVolume{
builder.ForPersistentVolume("provisioned-by").ObjectMeta(builder.WithAnnotationsMap(map[string]string{"pv.kubernetes.io/provisioned-by": "csi.test.com"})).Result(),
builder.ForPersistentVolume("migrated-to").ObjectMeta(builder.WithAnnotationsMap(map[string]string{"pv.kubernetes.io/migrated-to": "csi.test.com"})).Result(),
builder.ForPersistentVolume("no-annotation").Result(),
}
csiDriver := storagev1api.CSIDriver{
ObjectMeta: metav1.ObjectMeta{Name: "csi.test.com"},
}
cli := fake.NewClientBuilder().WithLists(&storagev1api.CSIDriverList{Items: []storagev1api.CSIDriver{csiDriver}}).Build()

for _, pv := range testPVs {
isCSIPV, err := isProvisionedByCSI(logrus.StandardLogger(), pv, cli)
assert.Nil(t, err)
if pv.Name == "no-annotation" {
assert.False(t, isCSIPV)
} else {
assert.True(t, isCSIPV)
}
}
}

0 comments on commit ba868db

Please sign in to comment.