Skip to content

Commit

Permalink
Add resizing operation
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Kvapil <[email protected]>
  • Loading branch information
kvaps committed Jul 18, 2024
1 parent d3f1892 commit 0976d1e
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 1 deletion.
15 changes: 15 additions & 0 deletions charts/etcd-operator/templates/rbac/clusterrole-manager-role.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- persistentvolumeclaims
verbs:
- get
- list
- patch
- watch
- apiGroups:
- "storage.k8s.io"
resources:
- storageclasses
verbs:
- get
- apiGroups:
- etcd.aenix.io
resources:
Expand Down
15 changes: 15 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- persistentvolumeclaims
verbs:
- get
- list
- patch
- watch
- apiGroups:
- "storage.k8s.io"
resources:
- storageclasses
verbs:
- get
- apiGroups:
- etcd.aenix.io
resources:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
Expand Down
35 changes: 35 additions & 0 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/aenix-io/etcd-operator/internal/controller/factory"

clientv3 "go.etcd.io/etcd/client/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
Expand All @@ -67,6 +68,8 @@ type EtcdClusterReconciler struct {
// +kubebuilder:rbac:groups="",resources=secrets,verbs=view;list;watch
// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;create;delete;update;patch;list;watch
// +kubebuilder:rbac:groups="policy",resources=poddisruptionbudgets,verbs=get;create;delete;update;patch;list;watch
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;patch;watch
// +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get

// Reconcile checks CR and current cluster state and performs actions to transform current state to desired.
func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -146,6 +149,11 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
factory.FillConditions(instance)
}

// if size is different we have to remove statefulset it will be recreated in the next step
if err := r.checkAndDeleteStatefulSetIfNecessary(ctx, &state, instance); err != nil {
return ctrl.Result{}, err
}

// ensure managed resources
if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil {
return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
Expand Down Expand Up @@ -199,6 +207,28 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return r.updateStatus(ctx, instance)
}

// checkAndDeleteStatefulSetIfNecessary deletes the StatefulSet if the specified storage size has changed.
func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context.Context, state *observables, instance *etcdaenixiov1alpha1.EtcdCluster) error {
for _, volumeClaimTemplate := range state.statefulSet.Spec.VolumeClaimTemplates {
if volumeClaimTemplate.Name != "data" {
continue
}
currentStorage := volumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
desiredStorage := instance.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
if desiredStorage.Cmp(currentStorage) != 0 {
deletePolicy := metav1.DeletePropagationOrphan
log.Info(ctx, "Deleting StatefulSet due to storage change", "statefulSet", state.statefulSet.Name)
err := r.Delete(ctx, &state.statefulSet, &client.DeleteOptions{PropagationPolicy: &deletePolicy})
if err != nil {
log.Error(ctx, err, "Failed to delete StatefulSet")
return err
}
return nil
}
}
return nil
}

// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
Expand All @@ -213,6 +243,11 @@ func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
log.Error(ctx, err, "reconcile statefulset failed")
return err
}

if err := factory.UpdatePersistentVolumeClaims(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile persistentVolumeClaims failed")
return err
}
log.Debug(ctx, "statefulset reconciled")

return nil
Expand Down
68 changes: 67 additions & 1 deletion internal/controller/factory/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,19 @@ limitations under the License.

package factory

import etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
import (
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
)

func GetPVCName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
if len(cluster.Spec.Storage.VolumeClaimTemplate.Name) > 0 {
Expand All @@ -25,3 +37,57 @@ func GetPVCName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
//nolint:goconst
return "data"
}

// UpdatePersistentVolumeClaims checks and updates the sizes of PVCs in an EtcdCluster if the specified storage size is larger than the current.
func UpdatePersistentVolumeClaims(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, rclient client.Client) error {
labelSelector := labels.SelectorFromSet(labels.Set{
"app.kubernetes.io/name": cluster.Name,
})
listOptions := &client.ListOptions{
Namespace: cluster.Namespace,
LabelSelector: labelSelector,
}

// List all PVCs in the same namespace as the cluster using the label selector
pvcList := &corev1.PersistentVolumeClaimList{}
err := rclient.List(ctx, pvcList, listOptions)
if err != nil {
return fmt.Errorf("failed to list PVCs: %w", err)
}

// Desired size from the cluster spec
expectedPrefix := fmt.Sprintf("data-%s-", cluster.Name)
desiredSize := cluster.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]

for _, pvc := range pvcList.Items {
// Skip if PVC name does not match expected prefix
if !strings.HasPrefix(pvc.Name, expectedPrefix) {
continue
}

// Skip if specified StorageClass does not support volume expansion
if pvc.Spec.StorageClassName != nil {
sc := &storagev1.StorageClass{}
scName := *pvc.Spec.StorageClassName
err := rclient.Get(ctx, types.NamespacedName{Name: scName}, sc)
if err != nil {
return fmt.Errorf("failed to get StorageClass '%s' for PVC '%s': %w", scName, pvc.Name, err)
}
if sc.AllowVolumeExpansion == nil || !*sc.AllowVolumeExpansion {
continue
}
}

currentSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
// Only patch if the desired size is greater than the current size
if desiredSize.Cmp(currentSize) == 1 {
newSizePatch := []byte(fmt.Sprintf(`{"spec": {"resources": {"requests": {"storage": "%s"}}}}`, desiredSize.String()))
err = rclient.Patch(ctx, &pvc, client.RawPatch(types.StrategicMergePatchType, newSizePatch))
if err != nil {
return fmt.Errorf("failed to patch PVC %s for updated size: %w", pvc.Name, err)
}
}
}

return nil
}
157 changes: 157 additions & 0 deletions internal/controller/factory/pvc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
Copyright 2024 The etcd-operator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package factory

import (
"context"

"k8s.io/apimachinery/pkg/api/resource"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
storagev1 "k8s.io/api/storage/v1"
)

var _ = Describe("UpdatePersistentVolumeClaims", func() {
var (
ns *corev1.Namespace
ctx context.Context
cluster *etcdaenixiov1alpha1.EtcdCluster
fakeClient client.Client
)

BeforeEach(func() {
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "test-namespace",
},
}
ctx = context.TODO()
cluster = &etcdaenixiov1alpha1.EtcdCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cluster",
Namespace: ns.Name,
},
Spec: etcdaenixiov1alpha1.EtcdClusterSpec{
Storage: etcdaenixiov1alpha1.StorageSpec{
VolumeClaimTemplate: etcdaenixiov1alpha1.EmbeddedPersistentVolumeClaim{
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("10Gi"),
},
},
},
},
},
},
}

// Setting up the fake client
fakeClient = fake.NewClientBuilder().WithObjects(ns, cluster).Build()
})

Context("when updating PVC sizes", func() {
It("should handle no PVCs found correctly", func() {
Expect(UpdatePersistentVolumeClaims(ctx, cluster, fakeClient)).Should(Succeed())
})

It("should update PVC if the desired size is larger", func() {
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "data-test-cluster-0",
Namespace: ns.Name,
Labels: map[string]string{
"app.kubernetes.io/name": cluster.Name,
},
},
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: stringPointer("test-storage-class"),
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("5Gi"),
},
},
},
}
Expect(fakeClient.Create(ctx, pvc)).Should(Succeed())

sc := &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "test-storage-class",
},
AllowVolumeExpansion: boolPointer(true),
}
Expect(fakeClient.Create(ctx, sc)).Should(Succeed())

Expect(UpdatePersistentVolumeClaims(ctx, cluster, fakeClient)).Should(Succeed())

updatedPVC := &corev1.PersistentVolumeClaim{}
Expect(fakeClient.Get(ctx, types.NamespacedName{Name: pvc.Name, Namespace: ns.Name}, updatedPVC)).Should(Succeed())
Expect(updatedPVC.Spec.Resources.Requests[corev1.ResourceStorage]).To(Equal(resource.MustParse("10Gi")))
})

It("should skip updating PVC if StorageClass does not allow expansion", func() {
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "data-test-cluster-0",
Namespace: ns.Name,
Labels: map[string]string{
"app.kubernetes.io/name": cluster.Name,
},
},
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: stringPointer("non-expandable-storage-class"),
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("5Gi"),
},
},
},
}
Expect(fakeClient.Create(ctx, pvc)).Should(Succeed())

sc := &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: "non-expandable-storage-class",
},
AllowVolumeExpansion: boolPointer(false),
}
Expect(fakeClient.Create(ctx, sc)).Should(Succeed())

Expect(UpdatePersistentVolumeClaims(ctx, cluster, fakeClient)).Should(Succeed())
unchangedPVC := &corev1.PersistentVolumeClaim{}
Expect(fakeClient.Get(ctx, types.NamespacedName{Name: pvc.Name, Namespace: ns.Name}, unchangedPVC)).Should(Succeed())
Expect(unchangedPVC.Spec.Resources.Requests[corev1.ResourceStorage]).To(Equal(resource.MustParse("5Gi")))
})
})
})

func stringPointer(s string) *string {
return &s
}

func boolPointer(b bool) *bool {
return &b
}

0 comments on commit 0976d1e

Please sign in to comment.