Skip to content

Commit

Permalink
fix(cinder-csi-plugin): controllerServer ListVolumes with `--max-entr…
Browse files Browse the repository at this point in the history
…ies` arg

Add unit tests cases on listvolumes function in multicloud configuration
with and without `--max-entries` arg

Signed-off-by: MatthieuFin <[email protected]>
  • Loading branch information
MatthieuFin committed Jul 3, 2024
1 parent 89383ee commit b0e5a1c
Show file tree
Hide file tree
Showing 2 changed files with 948 additions and 46 deletions.
86 changes: 65 additions & 21 deletions pkg/csi/cinder/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,31 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
type CloudsStartingToken struct {
CloudName string `json:"cloud"`
Token string `json:"token"`
isEmpty bool
}

func (cs *controllerServer) extractNodeIDs(attachments []volumes.Attachment) []string {
nodeIDs := make([]string, len(attachments))
for i, attachment := range attachments {
nodeIDs[i] = attachment.ServerID
}
return nodeIDs
}

func (cs *controllerServer) createVolumeEntries(vlist []volumes.Volume) []*csi.ListVolumesResponse_Entry {
entries := make([]*csi.ListVolumesResponse_Entry, len(vlist))
for i, v := range vlist {
entries[i] = &csi.ListVolumesResponse_Entry{
Volume: &csi.Volume{
VolumeId: v.ID,
CapacityBytes: int64(v.Size * 1024 * 1024 * 1024),
},
Status: &csi.ListVolumesResponse_VolumeStatus{
PublishedNodeIds: cs.extractNodeIDs(v.Attachments),
},
}
}
return entries
}

func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
Expand All @@ -362,6 +387,7 @@ func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
var cloudsToken = CloudsStartingToken{
CloudName: "",
Token: "",
isEmpty: len(req.StartingToken) == 0,
}

cloudsNames := maps.Keys(cs.Clouds)
Expand All @@ -381,6 +407,16 @@ func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
var vlist []volumes.Volume
var nextPageToken string

if !cloudsToken.isEmpty && startingToken == "" {
// previous call ended on last volumes from "currentCloudName" we should pass to next one
for i := range cloudsNames {
if cloudsNames[i] == currentCloudName {
currentCloudName = cloudsNames[i+1]
break
}
}
}

startIdx := 0
for _, cloudName := range cloudsNames {
if cloudName == currentCloudName {
Expand All @@ -394,6 +430,7 @@ func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
} else {
vlist, nextPageToken, err = cs.Clouds[cloudsNames[idx]].ListVolumes(maxEntries, startingToken)
}
startingToken = nextPageToken
if err != nil {
klog.Errorf("Failed to ListVolumes: %v", err)
if cpoerrors.IsInvalidError(err) {
Expand All @@ -402,29 +439,13 @@ func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
return nil, status.Errorf(codes.Internal, "ListVolumes failed with error %v", err)
}

ventries := make([]*csi.ListVolumesResponse_Entry, 0, len(vlist))
for _, v := range vlist {
ventry := csi.ListVolumesResponse_Entry{
Volume: &csi.Volume{
VolumeId: v.ID,
CapacityBytes: int64(v.Size * 1024 * 1024 * 1024),
},
}

status := &csi.ListVolumesResponse_VolumeStatus{}
status.PublishedNodeIds = make([]string, 0, len(v.Attachments))
for _, attachment := range v.Attachments {
status.PublishedNodeIds = append(status.PublishedNodeIds, attachment.ServerID)
}
ventry.Status = status

ventries = append(ventries, &ventry)
}
klog.V(4).Infof("ListVolumes: retreived %d entries and %q next token from cloud %q", len(ventries), nextPageToken, cloudsNames[idx])
ventries := cs.createVolumeEntries(vlist)
klog.V(4).Infof("ListVolumes: retrieved %d entries and %q next token from cloud %q", len(ventries), nextPageToken, cloudsNames[idx])

cloudsVentries = append(cloudsVentries, ventries...)

// Reach maxEntries setup nextToken with cloud identifier if needed
sendEmptyToken := false
if maxEntries > 0 && len(cloudsVentries) == maxEntries {
if nextPageToken == "" {
if idx+1 == len(cloudsNames) {
Expand All @@ -437,12 +458,35 @@ func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
}, nil
} else {
// still clouds to process
cloudsToken.CloudName = cloudsNames[idx+1]
// set token to next non empty cloud
i := 0
for i = idx + 1; i < len(cloudsNames); i++ {
vlistTmp, _, err := cs.Clouds[cloudsNames[i]].ListVolumes(1, "")
if err != nil {
klog.Errorf("Failed to ListVolumes: %v", err)
if cpoerrors.IsInvalidError(err) {
return nil, status.Errorf(codes.Aborted, "[ListVolumes] Invalid request: %v", err)
}
return nil, status.Errorf(codes.Internal, "ListVolumes failed with error %v", err)
}
if len(vlistTmp) > 0 {
cloudsToken.CloudName = cloudsNames[i]
cloudsToken.isEmpty = false
break
}
}
if i == len(cloudsNames) {
sendEmptyToken = true
}
}
}
cloudsToken.CloudName = cloudsNames[idx]
cloudsToken.Token = nextPageToken
data, _ := json.Marshal(cloudsToken)
var data []byte
data, _ = json.Marshal(cloudsToken)
if sendEmptyToken {
data = []byte("")
}
klog.V(4).Infof("ListVolumes: completed with %d entries and %q next token", len(cloudsVentries), string(data))
return &csi.ListVolumesResponse{
Entries: cloudsVentries,
Expand Down
Loading

0 comments on commit b0e5a1c

Please sign in to comment.