Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent map writes bug of K8s metrics API client #268

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions internal/runners/k8s_metrics_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runners
import (
"bytes"
"context"
"sync"

"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
Expand Down Expand Up @@ -46,13 +47,23 @@ func (c *k8sMetricsApiClient) GetMetrics(ctx context.Context) (map[types.Namespa

// create a map to hold PVC usage data
pvcUsage := make(map[types.NamespacedName]*VolumeStats)
var mu sync.Mutex // serialize writes to pvcUsage

// use an errgroup to query kubelet for PVC usage on each node
eg, ctx := errgroup.WithContext(ctx)
for _, node := range nodes.Items {
nodeName := node.Name
eg.Go(func() error {
return getPVCUsage(clientset, nodeName, pvcUsage, ctx)
nodePVCUsage, err := getPVCUsageFromK8sMetricsAPI(ctx, clientset, nodeName)
if err != nil {
return err
}
mu.Lock()
defer mu.Unlock()
for k, v := range nodePVCUsage {
pvcUsage[k] = v
}
return nil
})
}

Expand All @@ -65,7 +76,9 @@ func (c *k8sMetricsApiClient) GetMetrics(ctx context.Context) (map[types.Namespa
return pvcUsage, nil
}

func getPVCUsage(clientset *kubernetes.Clientset, nodeName string, pvcUsage map[types.NamespacedName]*VolumeStats, ctx context.Context) error {
func getPVCUsageFromK8sMetricsAPI(
ctx context.Context, clientset *kubernetes.Clientset, nodeName string,
) (map[types.NamespacedName]*VolumeStats, error) {
// make the request to the api /metrics endpoint and handle the response
req := clientset.
CoreV1().
Expand All @@ -77,19 +90,21 @@ func getPVCUsage(clientset *kubernetes.Clientset, nodeName string, pvcUsage map[
Suffix("metrics")
respBody, err := req.DoRaw(ctx)
if err != nil {
return errors.Errorf("failed to get stats from kubelet on node %s: with error %s", nodeName, err)
return nil, errors.Errorf("failed to get stats from kubelet on node %s: with error %s", nodeName, err)
}
parser := expfmt.TextParser{}
metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(respBody))
if err != nil {
return errors.Wrapf(err, "failed to read response body from kubelet on node %s", nodeName)
return nil, errors.Wrapf(err, "failed to read response body from kubelet on node %s", nodeName)
}

pvcUsage := make(map[types.NamespacedName]*VolumeStats)

// volumeAvailableQuery
if gauge, ok := metricFamilies[volumeAvailableQuery]; ok {
for _, m := range gauge.Metric {
pvcName, value := parseMetric(m)
pvcUsage[pvcName] = new(VolumeStats)
pvcUsage[pvcName] = &VolumeStats{}
pvcUsage[pvcName].AvailableBytes = int64(value)
}
}
Expand All @@ -116,7 +131,7 @@ func getPVCUsage(clientset *kubernetes.Clientset, nodeName string, pvcUsage map[
pvcUsage[pvcName].CapacityInodeSize = int64(value)
}
}
return nil
return pvcUsage, nil
}

func parseMetric(m *dto.Metric) (pvcName types.NamespacedName, value uint64) {
Expand Down