Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ushitora-anqou committed Jan 12, 2024
1 parent 6cef66c commit 1ec6755
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 53 deletions.
3 changes: 3 additions & 0 deletions cmd/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,11 @@ func subMain() error {

storageClassReconciler := controller.NewStorageClassReconciler(
mgr.GetClient(),
containerImage,
namespace,
controllerURL,
monitoringStorageClasses,
probePeriod,
)
err = storageClassReconciler.SetupWithManager(mgr)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions cmd/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ var probeConfig struct {
nodeName string
}

var provisionProbeCmd = &cobra.Command{
Use: "provision-probe",
RunE: func(cmd *cobra.Command, args []string) error {
// Nothing to do
return nil
},
}

func init() {
fs := probeCmd.Flags()
fs.StringVar(&probeConfig.controllerAddr, "destination-address", "http://localhost:8080", "metrics aggregator's address")
Expand All @@ -40,4 +48,5 @@ func init() {
fs.StringVar(&probeConfig.nodeName, "node-name", "", "node name")

rootCmd.AddCommand(probeCmd)
rootCmd.AddCommand(provisionProbeCmd)
}
161 changes: 112 additions & 49 deletions internal/controller/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const (
Probe = iota
ProvisionProbe
MountProbe
)

var (
nodeCtrlLogger = ctrl.Log.WithName("node-reconciler")
)
Expand Down Expand Up @@ -91,7 +97,7 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}
return ctrl.Result{}, err
}
err = r.deleteCronJob(ctx, getCronJobName(node.Name, storageClass))
err = r.deleteCronJob(ctx, getCronJobName(&node.Name, storageClass))
if err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
Expand All @@ -117,7 +123,17 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}
return ctrl.Result{}, err
}
err = r.createOrUpdateJob(ctx, storageClass, node.Name)
err = createOrUpdateJob(
ctx,
r.client,
Probe,
storageClass,
r.namespace,
r.containerImage,
r.controllerURL,
r.probePeriod,
&node.Name,
)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -141,7 +157,12 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
// However, if the node and StorageClass names are too long, the CronJob name will not fit in 52 characters.
// So we cut off the node and StorageClass names to an appropriate length and added a hash value at the end
// to balance readability and uniqueness.
func getCronJobName(nodeName, storageClass string) string {
func getCronJobName(nodeNamePtr *string, storageClass string) string {
nodeName := ""
if nodeNamePtr != nil {
nodeName = *nodeNamePtr
}

sha1 := sha1.New()
io.WriteString(sha1, nodeName+"\000"+storageClass)
hashedName := hex.EncodeToString(sha1.Sum(nil))
Expand Down Expand Up @@ -178,7 +199,12 @@ func (r *NodeReconciler) deleteCronJob(ctx context.Context, cronJobName string)
return nil
}

func makeCronSchedule(storageClass, nodeName string, period int) string {
func makeCronSchedule(storageClass string, nodeNamePtr *string, period int) string {
nodeName := ""
if nodeNamePtr != nil {
nodeName = *nodeNamePtr
}

h := crc32.NewIEEE()
h.Write([]byte(storageClass))
h.Write([]byte(nodeName))
Expand All @@ -196,23 +222,35 @@ func addPodFinalizer(spec *corev1.PodTemplateSpec) {
spec.SetFinalizers(append(finalizers, constants.PodFinalizerName))
}

func (r *NodeReconciler) createOrUpdateJob(ctx context.Context, storageClass, nodeName string) error {
func createOrUpdateJob(
ctx context.Context,
client client.Client,
kind int,
storageClass string,
namespace string,
containerImage string,
controllerURL string,
probePeriod int,
nodeName *string,
) error {
nodeCtrlLogger.Info("createOrUpdateJob")
defer nodeCtrlLogger.Info("createOrUpdateJob Finished")

cronjob := &batchv1.CronJob{}
cronjob.SetNamespace(r.namespace)
cronjob.SetNamespace(namespace)
cronjob.SetName(getCronJobName(nodeName, storageClass))

op, err := ctrl.CreateOrUpdate(ctx, r.client, cronjob, func() error {
op, err := ctrl.CreateOrUpdate(ctx, client, cronjob, func() error {
label := map[string]string{
constants.ProbeNodeLabelKey: nodeName,
constants.ProbeStorageClassLabelKey: storageClass,
}
if nodeName != nil {
label[constants.ProbeNodeLabelKey] = *nodeName
}
cronjob.SetLabels(label)

cronjob.Spec.ConcurrencyPolicy = batchv1.ForbidConcurrent
cronjob.Spec.Schedule = makeCronSchedule(storageClass, nodeName, r.probePeriod)
cronjob.Spec.Schedule = makeCronSchedule(storageClass, nodeName, probePeriod)

var successfulJobsHistoryLimit = int32(0)
cronjob.Spec.SuccessfulJobsHistoryLimit = &successfulJobsHistoryLimit
Expand All @@ -231,20 +269,31 @@ func (r *NodeReconciler) createOrUpdateJob(ctx context.Context, storageClass, no
volumeName := "genericvol"
container := &cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers[0]
container.Name = constants.ProbeContainerName
container.Image = r.containerImage
container.VolumeMounts = []corev1.VolumeMount{
{
Name: volumeName,
MountPath: "/mounted",
},
}
container.Image = containerImage

switch kind {
case Probe:
container.VolumeMounts = []corev1.VolumeMount{
{
Name: volumeName,
MountPath: "/mounted",
},
}
container.Args = []string{
"probe",
fmt.Sprintf("--destination-address=%s", controllerURL),
"--path=/mounted/",
fmt.Sprintf("--node-name=%s", *nodeName),
fmt.Sprintf("--storage-class=%s", storageClass),
}

case MountProbe:
// FIXME: write here

container.Args = []string{
"probe",
fmt.Sprintf("--destination-address=%s", r.controllerURL),
"--path=/mounted/",
fmt.Sprintf("--node-name=%s", nodeName),
fmt.Sprintf("--storage-class=%s", storageClass),
case ProvisionProbe:
container.Args = []string{
"provision-probe",
}
}

var userID int64 = 1001
Expand All @@ -259,54 +308,68 @@ func (r *NodeReconciler) createOrUpdateJob(ctx context.Context, storageClass, no
var periodSeconds int64 = 5
cronjob.Spec.JobTemplate.Spec.Template.Spec.TerminationGracePeriodSeconds = &periodSeconds

cronjob.Spec.JobTemplate.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: corev1.LabelHostname,
Operator: corev1.NodeSelectorOpIn,
Values: []string{nodeName},
if kind == Probe || kind == MountProbe {
cronjob.Spec.JobTemplate.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: corev1.LabelHostname,
Operator: corev1.NodeSelectorOpIn,
Values: []string{*nodeName},
},
},
},
},
},
},
},
}
}

cronjob.Spec.JobTemplate.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
Ephemeral: &corev1.EphemeralVolumeSource{
VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
StorageClassName: &storageClass,
Resources: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceStorage: *resource.NewQuantity(
100*1024*1024, resource.BinarySI),
if kind == Probe || kind == ProvisionProbe {
cronjob.Spec.JobTemplate.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
Ephemeral: &corev1.EphemeralVolumeSource{
VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
StorageClassName: &storageClass,
Resources: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceStorage: *resource.NewQuantity(
100*1024*1024, resource.BinarySI),
},
},
},
},
},
},
},
},
}
} else { // kind == MountProbe
// FIXME: write here
}

return nil
})

if err != nil {
return fmt.Errorf("failed to create CronJob on node %s of storageclass %s: %w", nodeName, storageClass, err)
if nodeName == nil {
return fmt.Errorf("failed to create CronJob of storageclass %s: %w", storageClass, err)
} else {
return fmt.Errorf("failed to create CronJob on node %s of storageclass %s: %w", *nodeName, storageClass, err)
}
}
if op != controllerutil.OperationResultNone {
nodeCtrlLogger.Info(fmt.Sprintf("CronJob successfully created node %s of storageclass %s: %s", nodeName, storageClass, op))
if nodeName == nil {
nodeCtrlLogger.Info(fmt.Sprintf("CronJob successfully created of storageclass %s: %s", storageClass, op))
} else {
nodeCtrlLogger.Info(fmt.Sprintf("CronJob successfully created node %s of storageclass %s: %s", *nodeName, storageClass, op))
}
}
return nil
}
Expand Down
14 changes: 12 additions & 2 deletions internal/controller/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ var _ = Describe("Node controller", func() {
err = nodeReconciler.SetupWithManager(mgr)
Expect(err).NotTo(HaveOccurred())

storageClassReconciler := NewStorageClassReconciler(k8sClient, "default", monitoringStorageClasses)
storageClassReconciler := NewStorageClassReconciler(
k8sClient,
"dummy",
"default",
"http://localhost:8082",
monitoringStorageClasses,
1,
)
err = storageClassReconciler.SetupWithManager(mgr)
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -87,7 +94,10 @@ var _ = Describe("Node controller", func() {
g.Expect(apierrors.IsNotFound(err)).Should(BeTrue())

var cronJobList batchv1.CronJobList
err = k8sClient.List(ctx, &cronJobList)
err = k8sClient.List(ctx, &cronJobList, client.MatchingLabels(map[string]string{
"storage-class": "sc",
"node": "192.168.0.1",
}))
g.Expect(err).NotTo(HaveOccurred())
g.Expect(cronJobList.Items).Should(BeEmpty())
}).Should(Succeed())
Expand Down
30 changes: 29 additions & 1 deletion internal/controller/storageclass_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,32 @@ import (
// StorageClassReconciler reconciles a StorageClass object
type StorageClassReconciler struct {
client client.Client
containerImage string
namespace string
controllerURL string
monitoringStorageClasses map[string]struct{}
probePeriod int
}

func NewStorageClassReconciler(
client client.Client,
containerImage string,
namespace string,
controllerURL string,
monitoringStorageClasses []string,
probePeriod int,
) *StorageClassReconciler {
storageClass := make(map[string]struct{})
for _, sc := range monitoringStorageClasses {
storageClass[sc] = struct{}{}
}
return &StorageClassReconciler{
client: client,
containerImage: containerImage,
namespace: namespace,
controllerURL: controllerURL,
monitoringStorageClasses: storageClass,
probePeriod: probePeriod,
}
}

Expand Down Expand Up @@ -62,7 +71,7 @@ func (r *StorageClassReconciler) Reconcile(ctx context.Context, req ctrl.Request
if !storageClass.DeletionTimestamp.IsZero() {
if controllerutil.ContainsFinalizer(&storageClass, constants.StorageClassFinalizerName) {
label := map[string]string{
"storage-class": req.Name,
constants.ProbeStorageClassLabelKey: req.Name,
}
err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, client.InNamespace(r.namespace), client.MatchingLabels(label))
if err != nil {
Expand All @@ -86,6 +95,25 @@ func (r *StorageClassReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}

// Create/update provision-probe cronjob if storageClass is included in monitoringStorageClasses
_, ok := r.monitoringStorageClasses[storageClass.GetName()]
if ok {
err := createOrUpdateJob(
ctx,
r.client,
ProvisionProbe,
storageClass.Name,
r.namespace,
r.containerImage,
r.controllerURL,
r.probePeriod,
nil,
)
if err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

Expand Down
9 changes: 8 additions & 1 deletion internal/controller/storageclass_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ var _ = Describe("StorageClass controller", func() {
err = nodeReconciler.SetupWithManager(mgr)
Expect(err).NotTo(HaveOccurred())

storageClassReconciler := NewStorageClassReconciler(k8sClient, "default", monitoringStorageClasses)
storageClassReconciler := NewStorageClassReconciler(
k8sClient,
"dummy",
"default",
"http://localhost:8082",
monitoringStorageClasses,
1,
)
err = storageClassReconciler.SetupWithManager(mgr)
Expect(err).NotTo(HaveOccurred())

Expand Down

0 comments on commit 1ec6755

Please sign in to comment.