Skip to content

Commit

Permalink
Merge pull request #416 from andrewd-zededa/csi-handler-details-alloc…
Browse files Browse the repository at this point in the history
…atedSpace

Metrics Cleanup: Fix pvc allocated space reporting and pvc size at creation time.
  • Loading branch information
zedi-pramodh authored Jan 3, 2025
2 parents f0268aa + 7ca4294 commit 1b46a77
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 61 deletions.
7 changes: 5 additions & 2 deletions pkg/pillar/cmd/volumemgr/handlediskmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ func createOrUpdateDiskMetrics(ctx *volumemgrContext, wdName string) {
publishDiskMetrics(ctx, diskMetricList...)
for _, volumeStatus := range getAllVolumeStatus(ctx) {
ctx.ps.StillRunning(wdName, warningTime, errorTime)
if err := createOrUpdateAppDiskMetrics(ctx, volumeStatus); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, wdName, volumeStatus); err != nil {
log.Errorf("CreateOrUpdateCommonDiskMetrics: exception while publishing diskmetric. %s", err.Error())
}
}
}

func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, volumeStatus *types.VolumeStatus) error {
func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, wdName string, volumeStatus *types.VolumeStatus) error {
log.Functionf("createOrUpdateAppDiskMetrics(%s, %s)", volumeStatus.VolumeID, volumeStatus.FileLocation)

if volumeStatus.FileLocation == "" {
Expand All @@ -328,9 +328,12 @@ func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, volumeStatus *types.Vol
} else {
// Kubevirt eve volumes have no location on /persist, they are PVCs
volumeStatus.FileLocation = volumeStatus.GetPVCName()
publishVolumeStatus(ctx, volumeStatus)
}
}

// Some handlers (csi) can have http timeouts, update the watchdog
ctx.ps.StillRunning(wdName, warningTime, errorTime)
actualSize, maxSize, diskType, dirtyFlag, err := volumehandlers.GetVolumeHandler(log, ctx, volumeStatus).GetVolumeDetails()
if err != nil {
err = fmt.Errorf("createOrUpdateAppDiskMetrics(%s, %s): exception while getting volume size. %s",
Expand Down
4 changes: 2 additions & 2 deletions pkg/pillar/cmd/volumemgr/handlepvcdiskmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func createOrUpdatePvcDiskMetrics(ctx *volumemgrContext) {
continue
}

pvcMetric := lookupDiskMetric(ctx, pvcName)
pvcMetric := lookupDiskMetric(ctx, sdName+"-"+pvcName)
if pvcMetric == nil {
pvcMetric = &types.DiskMetric{DiskPath: pvcName, IsDir: false}
pvcMetric = &types.DiskMetric{DiskPath: sdName + "-" + pvcName, IsDir: false}
}
pvcMetric.ReadBytes = metric.ReadBytes
pvcMetric.WriteBytes = metric.WriteBytes
Expand Down
12 changes: 6 additions & 6 deletions pkg/pillar/cmd/volumemgr/handlevolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func handleVolumeModify(ctxArg interface{}, key string,
status.SetError(errStr, time.Now())
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleVolumeModify(%s): exception while publishing diskmetric. %s", key, err.Error())
}
return
Expand All @@ -65,7 +65,7 @@ func handleVolumeModify(ctxArg interface{}, key string,
updateVolumeStatusRefCount(ctx, status)
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleVolumeModify(%s): exception while publishing diskmetric. %s", key, err.Error())
}
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
status.SetError(err.Error(), time.Now())
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error())
}
return
Expand Down Expand Up @@ -169,7 +169,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
}
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error())
}
return
Expand All @@ -180,7 +180,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
}
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error())
}
log.Tracef("handleDeferredVolumeCreate(%s) done", key)
Expand Down Expand Up @@ -329,7 +329,7 @@ func maybeSpaceAvailable(ctx *volumemgrContext) {
if changed {
publishVolumeStatus(ctx, &status)
updateVolumeRefStatus(ctx, &status)
if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil {
log.Errorf("maybeSpaceAvailable(%s): exception while publishing diskmetric. %s", status.Key(), err.Error())
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pillar/cmd/volumemgr/handlevolumeref.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func handleVolumeRefCreate(ctxArg interface{}, key string,
if changed {
publishVolumeStatus(ctx, vs)
updateVolumeRefStatus(ctx, vs)
if err := createOrUpdateAppDiskMetrics(ctx, vs); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, vs); err != nil {
log.Errorf("handleVolumeRefCreate(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func handleVolumeRefModify(ctxArg interface{}, key string,
}
updateVolumeStatusRefCount(ctx, vs)
publishVolumeStatus(ctx, vs)
if err := createOrUpdateAppDiskMetrics(ctx, vs); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, vs); err != nil {
log.Errorf("handleVolumeRefModify(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pillar/cmd/volumemgr/updatestatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ func updateVolumeStatus(ctx *volumemgrContext, volumeID uuid.UUID) bool {
publishVolumeStatus(ctx, &status)
updateVolumeRefStatus(ctx, &status)
}
if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil {
log.Errorf("updateVolumeStatus(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down Expand Up @@ -938,7 +938,7 @@ func updateVolumeStatusFromContentID(ctx *volumemgrContext, contentID uuid.UUID)
if changed {
publishVolumeStatus(ctx, &status)
updateVolumeRefStatus(ctx, &status)
if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil {
log.Errorf("updateVolumeStatus(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/pillar/kubeapi/longhorninfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,30 @@ func min(a, b types.ServiceStatus) types.ServiceStatus {
return b
}

// LonghornVolumeSizeDetails returns the provisionedBytes and allocatedBytes size values for a longhorn volume
func LonghornVolumeSizeDetails(longhornVolumeName string) (provisionedBytes uint64, allocatedBytes uint64, err error) {
config, err := GetKubeConfig()
if err != nil {
return 0, 0, fmt.Errorf("LonghornVolumeSizeDetails can't get kubeconfig %v", err)
}

lhClient, err := versioned.NewForConfig(config)
if err != nil {
return 0, 0, fmt.Errorf("LonghornVolumeSizeDetails can't get versioned config: %v", err)
}

// Don't allow a k8s api timeout keep us waiting forever, set this one explicitly as its used in metrics path
shortContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

lhVol, err := lhClient.LonghornV1beta2().Volumes("longhorn-system").Get(shortContext, longhornVolumeName, metav1.GetOptions{})
if err != nil || lhVol == nil {
return 0, 0, fmt.Errorf("LonghornVolumeSizeDetails can't get lh vol err:%v", err)
}

return uint64(lhVol.Spec.Size), uint64(lhVol.Status.ActualSize), nil
}

// PopulateKVIFromPVCName uses the longhorn api to retrieve volume and replica health
// to be sent out to the controller as info messages
func PopulateKVIFromPVCName(kvi *types.KubeVolumeInfo) (*types.KubeVolumeInfo, error) {
Expand Down
69 changes: 24 additions & 45 deletions pkg/pillar/kubeapi/vitoapiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
zconfig "github.com/lf-edge/eve-api/go/config"
//"github.com/lf-edge/eve/pkg/newlog/go/pkg/mod/github.com/google/[email protected]+incompatible/log"
"github.com/lf-edge/eve/pkg/pillar/base"
"github.com/lf-edge/eve/pkg/pillar/diskmetrics"
"github.com/lf-edge/eve/pkg/pillar/types"
corev1 "k8s.io/api/core/v1"
v1errors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -144,42 +143,39 @@ func GetPVCInfo(pvcName string, log *base.LogObject) (*types.ImgInfo, error) {
return nil, err
}

fmt := zconfig.Format_name[int32(zconfig.Format_PVC)]
imgfmt := zconfig.Format_name[int32(zconfig.Format_PVC)]
imgInfo := types.ImgInfo{
Format: fmt,
Format: imgfmt,
Filename: pvcName,
DirtyFlag: false,
}
// Get the actual and used size of the PVC.
actualSizeBytes, usedSizeBytes := getPVCSizes(pvc)

imgInfo.ActualSize = actualSizeBytes
imgInfo.VirtualSize = usedSizeBytes
// PVC asks for a minimum size, spec may be less than actual (status) provisioned
provisionedBytes := getPVCSize(pvc)

return &imgInfo, nil
}
// Ask longhorn for the PVCs backing-volume allocated space
_, allocatedBytes, err := LonghornVolumeSizeDetails(pvc.Spec.VolumeName)
if err != nil {
err = fmt.Errorf("GetPVCInfo failed to get info for pvc %s volume %s: %v", pvcName, pvc.Spec.VolumeName, err)
log.Error(err)
return &imgInfo, err
}

// Returns the actual and used size of the PVC in bytes
func getPVCSizes(pvc *corev1.PersistentVolumeClaim) (actualSizeBytes, usedSizeBytes uint64) {
// Extract the actual size of the PVC from its spec.
actualSizeBytes = 0
usedSizeBytes = 0
imgInfo.ActualSize = allocatedBytes
imgInfo.VirtualSize = provisionedBytes

if pvc.Spec.Resources.Requests != nil {
if quantity, ok := pvc.Spec.Resources.Requests[corev1.ResourceStorage]; ok {
actualSizeBytes = uint64(quantity.Value())
}
}
return &imgInfo, nil
}

// Extract the used size of the PVC from its status.
// Returns the provisioned size of the PVC in bytes
func getPVCSize(pvc *corev1.PersistentVolumeClaim) (provisionedSizeBytes uint64) {
// Status field contains the size of the volume which actually bound to the claim
if pvc.Status.Phase == corev1.ClaimBound {
if quantity, ok := pvc.Status.Capacity[corev1.ResourceStorage]; ok {
usedSizeBytes = uint64(quantity.Value())
return uint64(quantity.Value())
}
}

return actualSizeBytes, usedSizeBytes

return 0
}

// longhorn PVC deals with Ki Mi not KB, MB
Expand Down Expand Up @@ -227,7 +223,7 @@ func NewPVCDefinition(pvcName string, size string, annotations,
// diskfile can be in qcow or raw format
// If pvc does not exist, the command will create PVC and copies the data.
func RolloutDiskToPVC(ctx context.Context, log *base.LogObject, exists bool,
diskfile string, pvcName string, filemode bool) error {
diskfile string, pvcName string, filemode bool, pvcSize uint64) error {

// Get the Kubernetes clientset
clientset, err := GetClientSet()
Expand Down Expand Up @@ -262,27 +258,10 @@ func RolloutDiskToPVC(ctx context.Context, log *base.LogObject, exists bool,
clusterIP := service.Spec.ClusterIP
uploadproxyURL := "https://" + clusterIP + ":443"
log.Noticef("RolloutDiskToPVC diskfile %s pvc %s URL %s", diskfile, pvcName, uploadproxyURL)
volSize, err := diskmetrics.GetDiskVirtualSize(log, diskfile)
if err != nil {
err = fmt.Errorf("failed to get virtual size of disk %s: %v", diskfile, err)
log.Error(err)
return err
}

// ActualSize can be larger than VirtualSize for fully-allocated/not-thin QCOW2 files
actualVolSize, err := diskmetrics.GetDiskActualSize(log, diskfile)
if err != nil {
err = fmt.Errorf("failed to get actual size of disk %s: %v", diskfile, err)
log.Error(err)
return err
}
if actualVolSize > volSize {
volSize = actualVolSize
}

// Create PVC and then copy data. We create PVC to set the designated node id label.
if !exists {
err = CreatePVC(pvcName, volSize, log)
err = CreatePVC(pvcName, pvcSize, log)
if err != nil {
err = fmt.Errorf("Error creating PVC %s", pvcName)
log.Error(err)
Expand Down Expand Up @@ -313,15 +292,15 @@ func RolloutDiskToPVC(ctx context.Context, log *base.LogObject, exists bool,
args = append(args, "--no-create")
} else {
// Add size
args = append(args, "--size", fmt.Sprint(volSize))
args = append(args, "--size", fmt.Sprint(pvcSize))
}

log.Noticef("virtctl args %v", args)

uploadTry := 0
maxRetries := 10
timeoutBaseSeconds := int64(300) // 5 min
volSizeGB := int64(volSize / 1024 / 1024 / 1024)
volSizeGB := int64(pvcSize / 1024 / 1024 / 1024)
timeoutPer1GBSeconds := int64(120)
timeout := time.Duration(timeoutBaseSeconds + (volSizeGB * timeoutPer1GBSeconds))
log.Noticef("RolloutDiskToPVC calculated timeout to %d seconds due to volume size %d GB", timeout, volSizeGB)
Expand Down
6 changes: 4 additions & 2 deletions pkg/pillar/volumehandlers/csihandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (handler *volumeHandlerCSI) CreateVolume() (string, error) {
return "", errors.New(errStr)
}

pvcerr := kubeapi.RolloutDiskToPVC(createContext, handler.log, false, rawImgFile, pvcName, false)
pvcerr := kubeapi.RolloutDiskToPVC(createContext, handler.log, false, rawImgFile, pvcName, false, pvcSize)

// Since we succeeded or failed to create PVC above, no point in keeping the rawImgFile.
// Delete it to save space.
Expand All @@ -198,7 +198,7 @@ func (handler *volumeHandlerCSI) CreateVolume() (string, error) {
return pvcName, errors.New(errStr)
}
// Convert qcow2 to PVC
err = kubeapi.RolloutDiskToPVC(createContext, handler.log, false, qcowFile, pvcName, false)
err = kubeapi.RolloutDiskToPVC(createContext, handler.log, false, qcowFile, pvcName, false, pvcSize)

if err != nil {
errStr := fmt.Sprintf("Error converting %s to PVC %s: %v",
Expand Down Expand Up @@ -244,6 +244,8 @@ func (handler *volumeHandlerCSI) DestroyVolume() (string, error) {
func (handler *volumeHandlerCSI) Populate() (bool, error) {
pvcName := handler.status.GetPVCName()
isReplicated := handler.status.IsReplicated
// Kubevirt eve volumes have no location on /persist, they are PVCs
handler.status.FileLocation = pvcName
handler.log.Noticef("Populate called for PVC %s", pvcName)
// A replicated volume is created on designated node, this node is supposed to be a replica volume.
// so wait until the replica is created. It could happen that the designated node did not even receive
Expand Down

0 comments on commit 1b46a77

Please sign in to comment.