Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Refactor resource generation #84

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 4 additions & 158 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -130,179 +129,26 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// ensureClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, isClusterInitialized bool) error {
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, isClusterReady bool) error {
// 1. create or update configmap <name>-cluster-state
if err := r.ensureClusterStateConfigMap(ctx, cluster, isClusterInitialized); err != nil {
if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, isClusterReady, r.Client, r.Scheme); err != nil {
return err
}
if err := r.ensureClusterService(ctx, cluster); err != nil {
if err := factory.CreateOrUpdateClusterService(ctx, cluster, r.Client, r.Scheme); err != nil {
return err
}
// 2. create or update statefulset
if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client, r.Scheme); err != nil {
return err
}
// 3. create or update ClusterIP Service
if err := r.ensureClusterClientService(ctx, cluster); err != nil {
if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client, r.Scheme); err != nil {
return err
}

return nil
}

func (r *EtcdClusterReconciler) ensureClusterService(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
svc := &corev1.Service{}
err := r.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: cluster.Name,
}, svc)
// Service exists, skip creation
if err == nil {
return nil
}
if !errors.IsNotFound(err) {
return fmt.Errorf("cannot get cluster service: %w", err)
}

svc = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name,
Namespace: cluster.Namespace,
Labels: map[string]string{
"app.kubernetes.io/name": "etcd",
"app.kubernetes.io/instance": cluster.Name,
"app.kubernetes.io/managed-by": "etcd-operator",
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "peer", TargetPort: intstr.FromInt32(2380), Port: 2380, Protocol: corev1.ProtocolTCP},
{Name: "client", TargetPort: intstr.FromInt32(2379), Port: 2379, Protocol: corev1.ProtocolTCP},
},
Type: corev1.ServiceTypeClusterIP,
ClusterIP: "None",
Selector: map[string]string{
"app.kubernetes.io/name": "etcd",
"app.kubernetes.io/instance": cluster.Name,
"app.kubernetes.io/managed-by": "etcd-operator",
},
PublishNotReadyAddresses: true,
},
}
if err = ctrl.SetControllerReference(cluster, svc, r.Scheme); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}
if err = r.Create(ctx, svc); err != nil {
return fmt.Errorf("cannot create cluster service: %w", err)
}
return nil
}

func (r *EtcdClusterReconciler) ensureClusterClientService(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
svc := &corev1.Service{}
err := r.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: r.getClientServiceName(cluster),
}, svc)
// Service exists, skip creation
if err == nil {
return nil
}
if !errors.IsNotFound(err) {
return fmt.Errorf("cannot get cluster client service: %w", err)
}

svc = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: r.getClientServiceName(cluster),
Namespace: cluster.Namespace,
Labels: map[string]string{
"app.kubernetes.io/name": "etcd",
"app.kubernetes.io/instance": cluster.Name,
"app.kubernetes.io/managed-by": "etcd-operator",
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "client", TargetPort: intstr.FromInt32(2379), Port: 2379, Protocol: corev1.ProtocolTCP},
},
Type: corev1.ServiceTypeClusterIP,
Selector: map[string]string{
"app.kubernetes.io/name": "etcd",
"app.kubernetes.io/instance": cluster.Name,
"app.kubernetes.io/managed-by": "etcd-operator",
},
},
}
if err = ctrl.SetControllerReference(cluster, svc, r.Scheme); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}
if err = r.Create(ctx, svc); err != nil {
return fmt.Errorf("cannot create cluster client service: %w", err)
}
return nil
}

// ensureClusterStateConfigMap creates or updates cluster state configmap.
func (r *EtcdClusterReconciler) ensureClusterStateConfigMap(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, isClusterInitialized bool) error {
configMap := &corev1.ConfigMap{}
err := r.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: factory.GetClusterStateConfigMapName(cluster),
}, configMap)
// configmap exists, skip editing.
if err == nil {
if isClusterInitialized {
// update cluster state to existing
configMap.Data["ETCD_INITIAL_CLUSTER_STATE"] = "existing"
if err = r.Update(ctx, configMap); err != nil {
return fmt.Errorf("cannot update cluster state configmap: %w", err)
}
}
return nil
}

// configmap does not exist, create with cluster state "new"
if errors.IsNotFound(err) {
initialCluster := ""
for i := int32(0); i < *cluster.Spec.Replicas; i++ {
if i > 0 {
initialCluster += ","
}
initialCluster += fmt.Sprintf("%s-%d=https://%s-%d.%s.%s.svc:2380",
cluster.Name, i,
cluster.Name, i, cluster.Name, cluster.Namespace,
)
}

configMap = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: factory.GetClusterStateConfigMapName(cluster),
},
Data: map[string]string{
"ETCD_INITIAL_CLUSTER_STATE": "new",
"ETCD_INITIAL_CLUSTER": initialCluster,
"ETCD_INITIAL_CLUSTER_TOKEN": cluster.Name + "-" + cluster.Namespace,
},
}
if err := ctrl.SetControllerReference(cluster, configMap, r.Scheme); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}
if err := r.Create(ctx, configMap); err != nil {
return fmt.Errorf("cannot create cluster state configmap: %w", err)
}
return nil
}

return fmt.Errorf("cannot get cluster state configmap: %w", err)
}

func (r *EtcdClusterReconciler) getClientServiceName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
return cluster.Name + "-client"
}

// updateStatusOnErr wraps error and updates EtcdCluster status
func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, err error) (ctrl.Result, error) {
res, statusErr := r.updateStatus(ctx, cluster)
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/etcdcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ var _ = Describe("EtcdCluster Controller", func() {
svc = &v1.Service{}
clientSvcName := types.NamespacedName{
Namespace: typeNamespacedName.Namespace,
Name: controllerReconciler.getClientServiceName(etcdcluster),
Name: factory.GetClientServiceName(etcdcluster),
}
err = k8sClient.Get(ctx, clientSvcName, svc)
Expect(err).NotTo(HaveOccurred(), "cluster client Service should exist")
Expand Down
42 changes: 41 additions & 1 deletion internal/controller/factory/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func reconcileSTS(ctx context.Context, rclient client.Client, crdName string, sts *appsv1.StatefulSet) error {
func reconcileStatefulSet(ctx context.Context, rclient client.Client, crdName string, sts *appsv1.StatefulSet) error {
logger := log.FromContext(ctx)

currentSts := &appsv1.StatefulSet{}
Expand All @@ -47,3 +48,42 @@ func reconcileSTS(ctx context.Context, rclient client.Client, crdName string, st
sts.Status = currentSts.Status
return rclient.Update(ctx, sts)
}

func reconcileConfigMap(ctx context.Context, rclient client.Client, crdName string, configMap *corev1.ConfigMap) error {
logger := log.FromContext(ctx)

currentConfigMap := &corev1.ConfigMap{}
err := rclient.Get(ctx, types.NamespacedName{Namespace: configMap.Namespace, Name: configMap.Name}, currentConfigMap)
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new configMap", "cm_name", configMap.Name, "crd_object", crdName)
return rclient.Create(ctx, configMap)
}
return fmt.Errorf("cannot get existing configMap: %s, for crd_object: %s, err: %w", configMap.Name, crdName, err)
}
configMap.Annotations = labels.Merge(currentConfigMap.Annotations, configMap.Annotations)
if configMap.ResourceVersion != "" {
configMap.ResourceVersion = currentConfigMap.ResourceVersion
}
return rclient.Update(ctx, configMap)
}

func reconcileService(ctx context.Context, rclient client.Client, crdName string, svc *corev1.Service) error {
logger := log.FromContext(ctx)

currentSvc := &corev1.Service{}
err := rclient.Get(ctx, types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}, currentSvc)
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new service", "svc_name", svc.Name, "crd_object", crdName)
return rclient.Create(ctx, svc)
}
return fmt.Errorf("cannot get existing service: %s, for crd_object: %s, err: %w", svc.Name, crdName, err)
}
svc.Annotations = labels.Merge(currentSvc.Annotations, svc.Annotations)
if svc.ResourceVersion != "" {
svc.ResourceVersion = currentSvc.ResourceVersion
}
svc.Status = currentSvc.Status
return rclient.Update(ctx, svc)
}
55 changes: 54 additions & 1 deletion internal/controller/factory/configMap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,61 @@ limitations under the License.

package factory

import etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
)

func GetClusterStateConfigMapName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
return cluster.Name + "-cluster-state"
}

func CreateOrUpdateClusterStateConfigMap(
ctx context.Context,
cluster *etcdaenixiov1alpha1.EtcdCluster,
isClusterReady bool,
rclient client.Client,
rscheme *runtime.Scheme,
) error {
initialCluster := ""
for i := int32(0); i < *cluster.Spec.Replicas; i++ {
if i > 0 {
initialCluster += ","
}
initialCluster += fmt.Sprintf("%s-%d=https://%s-%d.%s.%s.svc:2380",
cluster.Name, i,
cluster.Name, i, cluster.Name, cluster.Namespace,
)
}

configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: GetClusterStateConfigMapName(cluster),
},
Data: map[string]string{
"ETCD_INITIAL_CLUSTER_STATE": "new",
"ETCD_INITIAL_CLUSTER": initialCluster,
"ETCD_INITIAL_CLUSTER_TOKEN": cluster.Name + "-" + cluster.Namespace,
},
}

if isClusterReady {
// update cluster state to existing
configMap.Data["ETCD_INITIAL_CLUSTER_STATE"] = "existing"
}

if err := ctrl.SetControllerReference(cluster, configMap, rscheme); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}

return reconcileConfigMap(ctx, rclient, cluster.Name, configMap)
}
Loading