diff --git a/internal/controller/node_controller.go b/internal/controller/node_controller.go index 47588a5..4405ff8 100644 --- a/internal/controller/node_controller.go +++ b/internal/controller/node_controller.go @@ -5,7 +5,6 @@ import ( "crypto/sha1" "encoding/hex" "fmt" - "hash/crc32" "io" "github.com/topolvm/pie/constants" @@ -265,46 +264,6 @@ func (r *NodeReconciler) createOrUpdatePVC( return nil } -// CronJob name should be less than or equal to 52 characters. -// cf. https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ -// One CronJob is created per node and a StorageClass. -// It is desirable that the names of nodes and StorageClasses can be inferred from the names of CronJobs. -// However, if the node and StorageClass names are too long, the CronJob name will not fit in 52 characters. -// So we cut off the node and StorageClass names to an appropriate length and added a hash value at the end -// to balance readability and uniqueness. -func getCronJobName(kind int, nodeNamePtr *string, storageClass string) string { - nodeName := "" - if nodeNamePtr != nil { - nodeName = *nodeNamePtr - } - - sha1 := sha1.New() - io.WriteString(sha1, nodeName+"\000"+storageClass) - hashedName := hex.EncodeToString(sha1.Sum(nil)) - - maxNodeNameLength := 15 - maxStorageClassNameLength := 18 - if kind != Probe { - maxNodeNameLength = 13 - maxStorageClassNameLength = 15 - } - - if len(nodeName) > maxNodeNameLength { - nodeName = nodeName[:maxNodeNameLength] - } - if len(storageClass) > maxStorageClassNameLength { - storageClass = storageClass[:maxStorageClassNameLength] - } - - if kind == Probe { - return fmt.Sprintf("%s-%s-%s-%s", constants.ProbeNamePrefix, nodeName, storageClass, hashedName[:6]) - } else if kind == ProvisionProbe { - return fmt.Sprintf("%s-%s-%s", constants.ProvisionProbeNamePrefix, storageClass, hashedName[:6]) - } else { // kind == MountProbe - return fmt.Sprintf("%s-%s-%s-%s", constants.MountProbeNamePrefix, nodeName, storageClass, hashedName[:6]) - } -} - func (r *NodeReconciler) deleteCronJob(ctx context.Context, cronJobName string) error { var cronJobForDelete batchv1.CronJob err := r.client.Get(ctx, client.ObjectKey{Namespace: r.namespace, Name: cronJobName}, &cronJobForDelete) @@ -328,196 +287,6 @@ func (r *NodeReconciler) deleteCronJob(ctx context.Context, cronJobName string) return nil } -func makeCronSchedule(storageClass string, nodeNamePtr *string, period int) string { - nodeName := "" - if nodeNamePtr != nil { - nodeName = *nodeNamePtr - } - - h := crc32.NewIEEE() - h.Write([]byte(storageClass)) - h.Write([]byte(nodeName)) - - return fmt.Sprintf("%d-59/%d * * * *", h.Sum32()%uint32(period), period) -} - -func addPodFinalizer(spec *corev1.PodTemplateSpec) { - finalizers := spec.GetFinalizers() - for _, finalizer := range finalizers { - if finalizer == constants.PodFinalizerName { - return - } - } - spec.SetFinalizers(append(finalizers, constants.PodFinalizerName)) -} - -func createOrUpdateJob( - ctx context.Context, - client client.Client, - kind int, - storageClass string, - namespace string, - containerImage string, - controllerURL string, - probePeriod int, - nodeName *string, -) error { - logger := log.FromContext(ctx) - logger.Info("createOrUpdateJob") - defer logger.Info("createOrUpdateJob Finished") - - cronjob := &batchv1.CronJob{} - cronjob.SetNamespace(namespace) - cronjob.SetName(getCronJobName(kind, nodeName, storageClass)) - - op, err := ctrl.CreateOrUpdate(ctx, client, cronjob, func() error { - label := map[string]string{ - constants.ProbeStorageClassLabelKey: storageClass, - } - if nodeName != nil { - label[constants.ProbeNodeLabelKey] = *nodeName - } - cronjob.SetLabels(label) - - cronjob.Spec.ConcurrencyPolicy = batchv1.ForbidConcurrent - cronjob.Spec.Schedule = makeCronSchedule(storageClass, nodeName, probePeriod) - - var successfulJobsHistoryLimit = int32(0) - cronjob.Spec.SuccessfulJobsHistoryLimit = &successfulJobsHistoryLimit - - // according this doc https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.23/#jobspec-v1-batch, - // selector is set by the system - - cronjob.Spec.JobTemplate.Spec.Template.SetLabels(label) - - addPodFinalizer(&cronjob.Spec.JobTemplate.Spec.Template) - - if len(cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers) != 1 { - cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers = []corev1.Container{{}} - } - - volumeName := "genericvol" - container := &cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers[0] - container.Name = constants.ProbeContainerName - container.Image = containerImage - - switch kind { - case Probe: - container.VolumeMounts = []corev1.VolumeMount{ - { - Name: volumeName, - MountPath: "/mounted", - }, - } - container.Args = []string{ - "probe", - fmt.Sprintf("--destination-address=%s", controllerURL), - "--path=/mounted/", - fmt.Sprintf("--node-name=%s", *nodeName), - fmt.Sprintf("--storage-class=%s", storageClass), - } - - case MountProbe: - container.VolumeMounts = []corev1.VolumeMount{ - { - Name: volumeName, - MountPath: "/mounted", - }, - } - container.Args = []string{ - "probe", - "--mount-probe", - fmt.Sprintf("--destination-address=%s", controllerURL), - "--path=/mounted/", - fmt.Sprintf("--node-name=%s", *nodeName), - fmt.Sprintf("--storage-class=%s", storageClass), - } - - case ProvisionProbe: - container.Args = []string{ - "provision-probe", - } - } - - var userID int64 = 1001 - var groupID int64 = 1001 - cronjob.Spec.JobTemplate.Spec.Template.Spec.SecurityContext = &corev1.PodSecurityContext{ - RunAsUser: &userID, - RunAsGroup: &groupID, - FSGroup: &groupID, - } - - cronjob.Spec.JobTemplate.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever - var periodSeconds int64 = 5 - cronjob.Spec.JobTemplate.Spec.Template.Spec.TerminationGracePeriodSeconds = &periodSeconds - - if kind == Probe || kind == MountProbe { - cronjob.Spec.JobTemplate.Spec.Template.Spec.Affinity = &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Key: corev1.LabelHostname, - Operator: corev1.NodeSelectorOpIn, - Values: []string{*nodeName}, - }, - }, - }, - }, - }, - }, - } - } - - if kind == Probe || kind == ProvisionProbe { - cronjob.Spec.JobTemplate.Spec.Template.Spec.Volumes = []corev1.Volume{ - { - Name: volumeName, - VolumeSource: corev1.VolumeSource{ - Ephemeral: &corev1.EphemeralVolumeSource{ - VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{ - Spec: corev1.PersistentVolumeClaimSpec{ - AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, - StorageClassName: &storageClass, - Resources: corev1.ResourceRequirements{ - Requests: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceStorage: *resource.NewQuantity( - 100*1024*1024, resource.BinarySI), - }, - }, - }, - }, - }, - }, - }, - } - } else { // kind == MountProbe - cronjob.Spec.JobTemplate.Spec.Template.Spec.Volumes = []corev1.Volume{ - { - Name: volumeName, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: getPVCName(*nodeName, storageClass), - }, - }, - }, - } - } - - return nil - }) - - if err != nil { - return fmt.Errorf("failed to create CronJob: %s", getCronJobName(kind, nodeName, storageClass)) - } - if op != controllerutil.OperationResultNone { - logger.Info(fmt.Sprintf("CronJob successfully created: %s", getCronJobName(kind, nodeName, storageClass))) - } - return nil -} - // SetupWithManager sets up the controller with the Manager. func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { pred, err := predicate.LabelSelectorPredicate(*r.nodeSelector) diff --git a/internal/controller/probe_cronjob.go b/internal/controller/probe_cronjob.go new file mode 100644 index 0000000..fe03a69 --- /dev/null +++ b/internal/controller/probe_cronjob.go @@ -0,0 +1,249 @@ +package controller + +import ( + "context" + "crypto/sha1" + "encoding/hex" + "fmt" + "hash/crc32" + "io" + + "github.com/topolvm/pie/constants" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// CronJob name should be less than or equal to 52 characters. +// cf. https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ +// One CronJob is created per node and a StorageClass. +// It is desirable that the names of nodes and StorageClasses can be inferred from the names of CronJobs. +// However, if the node and StorageClass names are too long, the CronJob name will not fit in 52 characters. +// So we cut off the node and StorageClass names to an appropriate length and added a hash value at the end +// to balance readability and uniqueness. +func getCronJobName(kind int, nodeNamePtr *string, storageClass string) string { + nodeName := "" + if nodeNamePtr != nil { + nodeName = *nodeNamePtr + } + + sha1 := sha1.New() + io.WriteString(sha1, nodeName+"\000"+storageClass) + hashedName := hex.EncodeToString(sha1.Sum(nil)) + + maxNodeNameLength := 15 + maxStorageClassNameLength := 18 + if kind != Probe { + maxNodeNameLength = 13 + maxStorageClassNameLength = 15 + } + + if len(nodeName) > maxNodeNameLength { + nodeName = nodeName[:maxNodeNameLength] + } + if len(storageClass) > maxStorageClassNameLength { + storageClass = storageClass[:maxStorageClassNameLength] + } + + if kind == Probe { + return fmt.Sprintf("%s-%s-%s-%s", constants.ProbeNamePrefix, nodeName, storageClass, hashedName[:6]) + } else if kind == ProvisionProbe { + return fmt.Sprintf("%s-%s-%s", constants.ProvisionProbeNamePrefix, storageClass, hashedName[:6]) + } else { // kind == MountProbe + return fmt.Sprintf("%s-%s-%s-%s", constants.MountProbeNamePrefix, nodeName, storageClass, hashedName[:6]) + } +} + +func makeCronSchedule(storageClass string, nodeNamePtr *string, period int) string { + nodeName := "" + if nodeNamePtr != nil { + nodeName = *nodeNamePtr + } + + h := crc32.NewIEEE() + h.Write([]byte(storageClass)) + h.Write([]byte(nodeName)) + + return fmt.Sprintf("%d-59/%d * * * *", h.Sum32()%uint32(period), period) +} + +func addPodFinalizer(spec *corev1.PodTemplateSpec) { + finalizers := spec.GetFinalizers() + for _, finalizer := range finalizers { + if finalizer == constants.PodFinalizerName { + return + } + } + spec.SetFinalizers(append(finalizers, constants.PodFinalizerName)) +} + +func createOrUpdateJob( + ctx context.Context, + client client.Client, + kind int, + storageClass string, + namespace string, + containerImage string, + controllerURL string, + probePeriod int, + nodeName *string, +) error { + logger := log.FromContext(ctx) + logger.Info("createOrUpdateJob") + defer logger.Info("createOrUpdateJob Finished") + + cronjob := &batchv1.CronJob{} + cronjob.SetNamespace(namespace) + cronjob.SetName(getCronJobName(kind, nodeName, storageClass)) + + op, err := ctrl.CreateOrUpdate(ctx, client, cronjob, func() error { + label := map[string]string{ + constants.ProbeStorageClassLabelKey: storageClass, + } + if nodeName != nil { + label[constants.ProbeNodeLabelKey] = *nodeName + } + cronjob.SetLabels(label) + + cronjob.Spec.ConcurrencyPolicy = batchv1.ForbidConcurrent + cronjob.Spec.Schedule = makeCronSchedule(storageClass, nodeName, probePeriod) + + var successfulJobsHistoryLimit = int32(0) + cronjob.Spec.SuccessfulJobsHistoryLimit = &successfulJobsHistoryLimit + + // according this doc https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.23/#jobspec-v1-batch, + // selector is set by the system + + cronjob.Spec.JobTemplate.Spec.Template.SetLabels(label) + + addPodFinalizer(&cronjob.Spec.JobTemplate.Spec.Template) + + if len(cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers) != 1 { + cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers = []corev1.Container{{}} + } + + volumeName := "genericvol" + container := &cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers[0] + container.Name = constants.ProbeContainerName + container.Image = containerImage + + switch kind { + case Probe: + container.VolumeMounts = []corev1.VolumeMount{ + { + Name: volumeName, + MountPath: "/mounted", + }, + } + container.Args = []string{ + "probe", + fmt.Sprintf("--destination-address=%s", controllerURL), + "--path=/mounted/", + fmt.Sprintf("--node-name=%s", *nodeName), + fmt.Sprintf("--storage-class=%s", storageClass), + } + + case MountProbe: + container.VolumeMounts = []corev1.VolumeMount{ + { + Name: volumeName, + MountPath: "/mounted", + }, + } + container.Args = []string{ + "probe", + "--mount-probe", + fmt.Sprintf("--destination-address=%s", controllerURL), + "--path=/mounted/", + fmt.Sprintf("--node-name=%s", *nodeName), + fmt.Sprintf("--storage-class=%s", storageClass), + } + + case ProvisionProbe: + container.Args = []string{ + "provision-probe", + } + } + + var userID int64 = 1001 + var groupID int64 = 1001 + cronjob.Spec.JobTemplate.Spec.Template.Spec.SecurityContext = &corev1.PodSecurityContext{ + RunAsUser: &userID, + RunAsGroup: &groupID, + FSGroup: &groupID, + } + + cronjob.Spec.JobTemplate.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever + var periodSeconds int64 = 5 + cronjob.Spec.JobTemplate.Spec.Template.Spec.TerminationGracePeriodSeconds = &periodSeconds + + if kind == Probe || kind == MountProbe { + cronjob.Spec.JobTemplate.Spec.Template.Spec.Affinity = &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: corev1.LabelHostname, + Operator: corev1.NodeSelectorOpIn, + Values: []string{*nodeName}, + }, + }, + }, + }, + }, + }, + } + } + + if kind == Probe || kind == ProvisionProbe { + cronjob.Spec.JobTemplate.Spec.Template.Spec.Volumes = []corev1.Volume{ + { + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + Ephemeral: &corev1.EphemeralVolumeSource{ + VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{ + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + StorageClassName: &storageClass, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceStorage: *resource.NewQuantity( + 100*1024*1024, resource.BinarySI), + }, + }, + }, + }, + }, + }, + }, + } + } else { // kind == MountProbe + cronjob.Spec.JobTemplate.Spec.Template.Spec.Volumes = []corev1.Volume{ + { + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: getPVCName(*nodeName, storageClass), + }, + }, + }, + } + } + + return nil + }) + + if err != nil { + return fmt.Errorf("failed to create CronJob: %s", getCronJobName(kind, nodeName, storageClass)) + } + if op != controllerutil.OperationResultNone { + logger.Info(fmt.Sprintf("CronJob successfully created: %s", getCronJobName(kind, nodeName, storageClass))) + } + return nil +}