Skip to content

Commit

Permalink
add probe_cronjob.go and move functions related to CronJobs to it
Browse files Browse the repository at this point in the history
Signed-off-by: Ryotaro Banno <[email protected]>
  • Loading branch information
ushitora-anqou committed Jan 18, 2024
1 parent 5cd92a9 commit 8baaf05
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 231 deletions.
231 changes: 0 additions & 231 deletions internal/controller/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/sha1"
"encoding/hex"
"fmt"
"hash/crc32"
"io"

"github.com/topolvm/pie/constants"
Expand Down Expand Up @@ -265,46 +264,6 @@ func (r *NodeReconciler) createOrUpdatePVC(
return nil
}

// CronJob name should be less than or equal to 52 characters.
// cf. https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/
// One CronJob is created per node and a StorageClass.
// It is desirable that the names of nodes and StorageClasses can be inferred from the names of CronJobs.
// However, if the node and StorageClass names are too long, the CronJob name will not fit in 52 characters.
// So we cut off the node and StorageClass names to an appropriate length and added a hash value at the end
// to balance readability and uniqueness.
func getCronJobName(kind int, nodeNamePtr *string, storageClass string) string {
nodeName := ""
if nodeNamePtr != nil {
nodeName = *nodeNamePtr
}

sha1 := sha1.New()
io.WriteString(sha1, nodeName+"\000"+storageClass)
hashedName := hex.EncodeToString(sha1.Sum(nil))

maxNodeNameLength := 15
maxStorageClassNameLength := 18
if kind != Probe {
maxNodeNameLength = 13
maxStorageClassNameLength = 15
}

if len(nodeName) > maxNodeNameLength {
nodeName = nodeName[:maxNodeNameLength]
}
if len(storageClass) > maxStorageClassNameLength {
storageClass = storageClass[:maxStorageClassNameLength]
}

if kind == Probe {
return fmt.Sprintf("%s-%s-%s-%s", constants.ProbeNamePrefix, nodeName, storageClass, hashedName[:6])
} else if kind == ProvisionProbe {
return fmt.Sprintf("%s-%s-%s", constants.ProvisionProbeNamePrefix, storageClass, hashedName[:6])
} else { // kind == MountProbe
return fmt.Sprintf("%s-%s-%s-%s", constants.MountProbeNamePrefix, nodeName, storageClass, hashedName[:6])
}
}

func (r *NodeReconciler) deleteCronJob(ctx context.Context, cronJobName string) error {
var cronJobForDelete batchv1.CronJob
err := r.client.Get(ctx, client.ObjectKey{Namespace: r.namespace, Name: cronJobName}, &cronJobForDelete)
Expand All @@ -328,196 +287,6 @@ func (r *NodeReconciler) deleteCronJob(ctx context.Context, cronJobName string)
return nil
}

func makeCronSchedule(storageClass string, nodeNamePtr *string, period int) string {
nodeName := ""
if nodeNamePtr != nil {
nodeName = *nodeNamePtr
}

h := crc32.NewIEEE()
h.Write([]byte(storageClass))
h.Write([]byte(nodeName))

return fmt.Sprintf("%d-59/%d * * * *", h.Sum32()%uint32(period), period)
}

func addPodFinalizer(spec *corev1.PodTemplateSpec) {
finalizers := spec.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == constants.PodFinalizerName {
return
}
}
spec.SetFinalizers(append(finalizers, constants.PodFinalizerName))
}

func createOrUpdateJob(
ctx context.Context,
client client.Client,
kind int,
storageClass string,
namespace string,
containerImage string,
controllerURL string,
probePeriod int,
nodeName *string,
) error {
logger := log.FromContext(ctx)
logger.Info("createOrUpdateJob")
defer logger.Info("createOrUpdateJob Finished")

cronjob := &batchv1.CronJob{}
cronjob.SetNamespace(namespace)
cronjob.SetName(getCronJobName(kind, nodeName, storageClass))

op, err := ctrl.CreateOrUpdate(ctx, client, cronjob, func() error {
label := map[string]string{
constants.ProbeStorageClassLabelKey: storageClass,
}
if nodeName != nil {
label[constants.ProbeNodeLabelKey] = *nodeName
}
cronjob.SetLabels(label)

cronjob.Spec.ConcurrencyPolicy = batchv1.ForbidConcurrent
cronjob.Spec.Schedule = makeCronSchedule(storageClass, nodeName, probePeriod)

var successfulJobsHistoryLimit = int32(0)
cronjob.Spec.SuccessfulJobsHistoryLimit = &successfulJobsHistoryLimit

// according this doc https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.23/#jobspec-v1-batch,
// selector is set by the system

cronjob.Spec.JobTemplate.Spec.Template.SetLabels(label)

addPodFinalizer(&cronjob.Spec.JobTemplate.Spec.Template)

if len(cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers) != 1 {
cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers = []corev1.Container{{}}
}

volumeName := "genericvol"
container := &cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers[0]
container.Name = constants.ProbeContainerName
container.Image = containerImage

switch kind {
case Probe:
container.VolumeMounts = []corev1.VolumeMount{
{
Name: volumeName,
MountPath: "/mounted",
},
}
container.Args = []string{
"probe",
fmt.Sprintf("--destination-address=%s", controllerURL),
"--path=/mounted/",
fmt.Sprintf("--node-name=%s", *nodeName),
fmt.Sprintf("--storage-class=%s", storageClass),
}

case MountProbe:
container.VolumeMounts = []corev1.VolumeMount{
{
Name: volumeName,
MountPath: "/mounted",
},
}
container.Args = []string{
"probe",
"--mount-probe",
fmt.Sprintf("--destination-address=%s", controllerURL),
"--path=/mounted/",
fmt.Sprintf("--node-name=%s", *nodeName),
fmt.Sprintf("--storage-class=%s", storageClass),
}

case ProvisionProbe:
container.Args = []string{
"provision-probe",
}
}

var userID int64 = 1001
var groupID int64 = 1001
cronjob.Spec.JobTemplate.Spec.Template.Spec.SecurityContext = &corev1.PodSecurityContext{
RunAsUser: &userID,
RunAsGroup: &groupID,
FSGroup: &groupID,
}

cronjob.Spec.JobTemplate.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever
var periodSeconds int64 = 5
cronjob.Spec.JobTemplate.Spec.Template.Spec.TerminationGracePeriodSeconds = &periodSeconds

if kind == Probe || kind == MountProbe {
cronjob.Spec.JobTemplate.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: corev1.LabelHostname,
Operator: corev1.NodeSelectorOpIn,
Values: []string{*nodeName},
},
},
},
},
},
},
}
}

if kind == Probe || kind == ProvisionProbe {
cronjob.Spec.JobTemplate.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
Ephemeral: &corev1.EphemeralVolumeSource{
VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
StorageClassName: &storageClass,
Resources: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceStorage: *resource.NewQuantity(
100*1024*1024, resource.BinarySI),
},
},
},
},
},
},
},
}
} else { // kind == MountProbe
cronjob.Spec.JobTemplate.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: getPVCName(*nodeName, storageClass),
},
},
},
}
}

return nil
})

if err != nil {
return fmt.Errorf("failed to create CronJob: %s", getCronJobName(kind, nodeName, storageClass))
}
if op != controllerutil.OperationResultNone {
logger.Info(fmt.Sprintf("CronJob successfully created: %s", getCronJobName(kind, nodeName, storageClass)))
}
return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
pred, err := predicate.LabelSelectorPredicate(*r.nodeSelector)
Expand Down
Loading

0 comments on commit 8baaf05

Please sign in to comment.