Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resizing operation #254

Merged
merged 3 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/v1alpha1/etcdcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,26 @@ func (r *EtcdCluster) ValidateUpdate(old runtime.Object) (admission.Warnings, er
etcdclusterlog.Info("validate update", "name", r.Name)
var warnings admission.Warnings
oldCluster := old.(*EtcdCluster)

// Check if replicas are being resized
if *oldCluster.Spec.Replicas != *r.Spec.Replicas {
warnings = append(warnings, "cluster resize is not currently supported")
}

var allErrors field.ErrorList

// Check if storage size is being decreased
oldStorage := oldCluster.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
newStorage := r.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
if newStorage.Cmp(oldStorage) < 0 {
allErrors = append(allErrors, field.Invalid(
field.NewPath("spec", "storage", "volumeClaimTemplate", "resources", "requests", "storage"),
newStorage.String(),
"decreasing storage size is not allowed"),
)
}

// Check if storage type is changing
if oldCluster.Spec.Storage.EmptyDir == nil && r.Spec.Storage.EmptyDir != nil ||
oldCluster.Spec.Storage.EmptyDir != nil && r.Spec.Storage.EmptyDir == nil {
allErrors = append(allErrors, field.Invalid(
Expand All @@ -121,6 +136,7 @@ func (r *EtcdCluster) ValidateUpdate(old runtime.Object) (admission.Warnings, er
)
}

// Validate PodDisruptionBudget
pdbWarnings, pdbErr := r.validatePdb()
if pdbErr != nil {
allErrors = append(allErrors, pdbErr...)
Expand All @@ -129,11 +145,13 @@ func (r *EtcdCluster) ValidateUpdate(old runtime.Object) (admission.Warnings, er
warnings = append(warnings, pdbWarnings...)
}

// Validate Security
securityErr := r.validateSecurity()
if securityErr != nil {
allErrors = append(allErrors, securityErr...)
}

// Validate Options
if errOptions := validateOptions(r); errOptions != nil {
allErrors = append(allErrors, field.Invalid(
field.NewPath("spec", "options"),
Expand Down
40 changes: 40 additions & 0 deletions api/v1alpha1/etcdcluster_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,46 @@ var _ = Describe("EtcdCluster Webhook", func() {
}
})

It("Should reject decreasing storage size", func() {
etcdCluster := &EtcdCluster{
Spec: EtcdClusterSpec{
Replicas: ptr.To(int32(1)),
Storage: StorageSpec{
VolumeClaimTemplate: EmbeddedPersistentVolumeClaim{
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceStorage: resource.MustParse("5Gi"),
},
},
},
},
},
},
}
oldCluster := &EtcdCluster{
Spec: EtcdClusterSpec{
Replicas: ptr.To(int32(1)),
Storage: StorageSpec{
VolumeClaimTemplate: EmbeddedPersistentVolumeClaim{
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.VolumeResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceStorage: resource.MustParse("10Gi"),
},
},
},
},
},
},
}
_, err := etcdCluster.ValidateUpdate(oldCluster)
if Expect(err).To(HaveOccurred()) {
statusErr := err.(*errors.StatusError)
Expect(statusErr.ErrStatus.Message).To(ContainSubstring("decreasing storage size is not allowed"))
}
})

It("Should allow changing emptydir size", func() {
etcdCluster := &EtcdCluster{
Spec: EtcdClusterSpec{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- persistentvolumeclaims
verbs:
- get
- list
- patch
- watch
- apiGroups:
- "storage.k8s.io"
resources:
- storageclasses
verbs:
- get
- list
- apiGroups:
- etcd.aenix.io
resources:
Expand Down
16 changes: 16 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumeclaims
verbs:
- get
- list
- patch
- watch
- apiGroups:
- ""
resources:
Expand Down Expand Up @@ -94,3 +103,10 @@ rules:
- patch
- update
- watch
- apiGroups:
- storage.k8s.io
resources:
- storageclasses
verbs:
- get
- list
5 changes: 2 additions & 3 deletions examples/manifests/etcdcluster-persistent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ apiVersion: etcd.aenix.io/v1alpha1
kind: EtcdCluster
metadata:
name: test
namespace: default
spec:
replicas: 3
storage:
volumeClaimTemplate:
spec:
storageClassName: gp3
storageClassName: standard-with-expansion
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 10Gi
storage: 4Gi
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
Expand Down
34 changes: 34 additions & 0 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type EtcdClusterReconciler struct {
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch
// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;create;delete;update;patch;list;watch
// +kubebuilder:rbac:groups="policy",resources=poddisruptionbudgets,verbs=get;create;delete;update;patch;list;watch
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;patch;watch
// +kubebuilder:rbac:groups=storage.k8s.io,resources=storageclasses,verbs=get;list

// Reconcile checks CR and current cluster state and performs actions to transform current state to desired.
func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -168,6 +170,11 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
)
}

// if size is different we have to remove statefulset it will be recreated in the next step
if err := r.checkAndDeleteStatefulSetIfNecessary(ctx, &state, instance); err != nil {
return ctrl.Result{}, err
}

// ensure managed resources
if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil {
return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
Expand Down Expand Up @@ -231,6 +238,28 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return r.updateStatus(ctx, instance)
}

// checkAndDeleteStatefulSetIfNecessary deletes the StatefulSet if the specified storage size has changed.
func (r *EtcdClusterReconciler) checkAndDeleteStatefulSetIfNecessary(ctx context.Context, state *observables, instance *etcdaenixiov1alpha1.EtcdCluster) error {
for _, volumeClaimTemplate := range state.statefulSet.Spec.VolumeClaimTemplates {
if volumeClaimTemplate.Name != "data" {
continue
}
currentStorage := volumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
desiredStorage := instance.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]
if desiredStorage.Cmp(currentStorage) != 0 {
deletePolicy := metav1.DeletePropagationOrphan
log.Info(ctx, "Deleting StatefulSet due to storage change", "statefulSet", state.statefulSet.Name)
err := r.Delete(ctx, &state.statefulSet, &client.DeleteOptions{PropagationPolicy: &deletePolicy})
if err != nil {
log.Error(ctx, err, "Failed to delete StatefulSet")
return err
}
return nil
}
}
return nil
}

// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
Expand All @@ -245,6 +274,11 @@ func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
log.Error(ctx, err, "reconcile statefulset failed")
return err
}

if err := factory.UpdatePersistentVolumeClaims(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile persistentVolumeClaims failed")
return err
}
log.Debug(ctx, "statefulset reconciled")

return nil
Expand Down
68 changes: 67 additions & 1 deletion internal/controller/factory/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,19 @@ limitations under the License.

package factory

import etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
import (
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
)

func GetPVCName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
if len(cluster.Spec.Storage.VolumeClaimTemplate.Name) > 0 {
Expand All @@ -25,3 +37,57 @@ func GetPVCName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
//nolint:goconst
return "data"
}

// UpdatePersistentVolumeClaims checks and updates the sizes of PVCs in an EtcdCluster if the specified storage size is larger than the current.
func UpdatePersistentVolumeClaims(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, rclient client.Client) error {
labelSelector := labels.SelectorFromSet(labels.Set{
"app.kubernetes.io/instance": cluster.Name,
})
listOptions := &client.ListOptions{
Namespace: cluster.Namespace,
LabelSelector: labelSelector,
}

// List all PVCs in the same namespace as the cluster using the label selector
pvcList := &corev1.PersistentVolumeClaimList{}
err := rclient.List(ctx, pvcList, listOptions)
if err != nil {
return fmt.Errorf("failed to list PVCs: %w", err)
}

// Desired size from the cluster spec
expectedPrefix := fmt.Sprintf("data-%s-", cluster.Name)
desiredSize := cluster.Spec.Storage.VolumeClaimTemplate.Spec.Resources.Requests[corev1.ResourceStorage]

for _, pvc := range pvcList.Items {
// Skip if PVC name does not match expected prefix
if !strings.HasPrefix(pvc.Name, expectedPrefix) {
continue
}

// Skip if specified StorageClass does not support volume expansion
if pvc.Spec.StorageClassName != nil {
sc := &storagev1.StorageClass{}
scName := *pvc.Spec.StorageClassName
err := rclient.Get(ctx, types.NamespacedName{Name: scName}, sc)
if err != nil {
return fmt.Errorf("failed to get StorageClass '%s' for PVC '%s': %w", scName, pvc.Name, err)
}
if sc.AllowVolumeExpansion == nil || !*sc.AllowVolumeExpansion {
continue
}
}

currentSize := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
// Only patch if the desired size is greater than the current size
if desiredSize.Cmp(currentSize) == 1 {
newSizePatch := []byte(fmt.Sprintf(`{"spec": {"resources": {"requests": {"storage": "%s"}}}}`, desiredSize.String()))
err = rclient.Patch(ctx, &pvc, client.RawPatch(types.StrategicMergePatchType, newSizePatch))
if err != nil {
return fmt.Errorf("failed to patch PVC %s for updated size: %w", pvc.Name, err)
}
}
}

return nil
}
Loading