Skip to content

Commit

Permalink
Fix concurrent map writes bug of K8s metrics API client
Browse files Browse the repository at this point in the history
The pvc-autoresizer can use Prometheus or the K8s metrics API as sources
of metrics to determine whether a PVC needs to be resized. There is a
concurrent map access bug in the K8s metrics API client implementation,
which causes the pvc-autoresizer to crash. This commit fixes this issue.

Signed-off-by: Daichi Mukai <[email protected]>
  • Loading branch information
daichimukai committed Jul 4, 2024
1 parent 0723820 commit f9e1160
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions internal/runners/k8s_metrics_api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runners
import (
"bytes"
"context"
"sync"

"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
Expand Down Expand Up @@ -46,13 +47,23 @@ func (c *k8sMetricsApiClient) GetMetrics(ctx context.Context) (map[types.Namespa

// create a map to hold PVC usage data
pvcUsage := make(map[types.NamespacedName]*VolumeStats)
var mu sync.Mutex // serialize writes to pvcUsage

// use an errgroup to query kubelet for PVC usage on each node
eg, ctx := errgroup.WithContext(ctx)
for _, node := range nodes.Items {
nodeName := node.Name
eg.Go(func() error {
return getPVCUsage(clientset, nodeName, pvcUsage, ctx)
nodePVCUsage, err := getPVCUsageFromK8sMetricsAPI(ctx, clientset, nodeName)
if err != nil {
return nil
}
mu.Lock()
defer mu.Unlock()
for k, v := range nodePVCUsage {
pvcUsage[k] = v
}
return nil
})
}

Expand All @@ -65,7 +76,9 @@ func (c *k8sMetricsApiClient) GetMetrics(ctx context.Context) (map[types.Namespa
return pvcUsage, nil
}

func getPVCUsage(clientset *kubernetes.Clientset, nodeName string, pvcUsage map[types.NamespacedName]*VolumeStats, ctx context.Context) error {
func getPVCUsageFromK8sMetricsAPI(
ctx context.Context, clientset *kubernetes.Clientset, nodeName string,
) (map[types.NamespacedName]*VolumeStats, error) {
// make the request to the api /metrics endpoint and handle the response
req := clientset.
CoreV1().
Expand All @@ -77,19 +90,21 @@ func getPVCUsage(clientset *kubernetes.Clientset, nodeName string, pvcUsage map[
Suffix("metrics")
respBody, err := req.DoRaw(ctx)
if err != nil {
return errors.Errorf("failed to get stats from kubelet on node %s: with error %s", nodeName, err)
return nil, errors.Errorf("failed to get stats from kubelet on node %s: with error %s", nodeName, err)
}
parser := expfmt.TextParser{}
metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(respBody))
if err != nil {
return errors.Wrapf(err, "failed to read response body from kubelet on node %s", nodeName)
return nil, errors.Wrapf(err, "failed to read response body from kubelet on node %s", nodeName)
}

pvcUsage := make(map[types.NamespacedName]*VolumeStats)

// volumeAvailableQuery
if gauge, ok := metricFamilies[volumeAvailableQuery]; ok {
for _, m := range gauge.Metric {
pvcName, value := parseMetric(m)
pvcUsage[pvcName] = new(VolumeStats)
pvcUsage[pvcName] = &VolumeStats{}
pvcUsage[pvcName].AvailableBytes = int64(value)
}
}
Expand All @@ -116,7 +131,7 @@ func getPVCUsage(clientset *kubernetes.Clientset, nodeName string, pvcUsage map[
pvcUsage[pvcName].CapacityInodeSize = int64(value)
}
}
return nil
return pvcUsage, nil
}

func parseMetric(m *dto.Metric) (pvcName types.NamespacedName, value uint64) {
Expand Down

0 comments on commit f9e1160

Please sign in to comment.