Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: New node gRPC server for initiator retrieval #76

Merged
merged 5 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ help:
all: clean bin openshift push
openshift-all: clean openshift push

bin: controller node
bin: protoc controller node

protoc:
@echo ""
@echo "[] protocol buffers"
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./pkg/node_service/node_servicepb/node_rpc.proto

controller:
@echo ""
Expand Down
11 changes: 11 additions & 0 deletions helm/csi-charts/templates/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ spec:
- -bind=unix://{{ .Values.kubeletPath }}/plugins/csi-exos-x.seagate.com/csi.sock
- -chroot=/host
{{- include "csidriver.extraArgs" .Values.node | indent 10 }}
env:
- name: CSI_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: CSI_NODE_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: CSI_NODE_SERVICE_PORT
value: "978"
securityContext:
privileged: true
volumeMounts:
Expand Down
3 changes: 3 additions & 0 deletions helm/csi-charts/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ spec:
- seagate-exos-x-csi-controller
- -bind=unix:///csi/csi.sock
{{- include "csidriver.extraArgs" .Values.controller | indent 10 }}
env:
- name: CSI_NODE_SERVICE_PORT
value: "978"
volumeMounts:
- name: socket-dir
mountPath: /csi
Expand Down
5 changes: 5 additions & 0 deletions pkg/common/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ const (
MaximumLUN = 255
VolumeNameMaxLength = 31
VolumePrefixMaxLength = 3

//If changed, must also be updated in helm charts
NodeIPEnvVar = "CSI_NODE_IP"
NodeNameEnvVar = "CSI_NODE_NAME"
NodeServicePortEnvVar = "CSI_NODE_SERVICE_PORT"
)

// Driver contains main resources needed by the driver and references the underlying specific driver
Expand Down
7 changes: 0 additions & 7 deletions pkg/common/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,6 @@ func (driver *Driver) GetPluginCapabilities(ctx context.Context, req *csi.GetPlu
},
},
},
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
},
},
},
},
}, nil
}
20 changes: 4 additions & 16 deletions pkg/controller/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,6 @@ func (controller *Controller) CreateVolume(ctx context.Context, req *csi.CreateV
// Extract the storage interface protocol to be used for this volume (iscsi, fc, sas, etc)
storageProtocol := storage.ValidateStorageProtocol(parameters[common.StorageProtocolKey])

klog.Infof("Create Volume -- Requisite Topology: %v", req.GetAccessibilityRequirements().GetRequisite())
klog.Infof("Create Volume -- Preferred Topology: %v", req.GetAccessibilityRequirements().GetPreferred())

//insert topology keys into the parameters map, so they will be available in ControllerPublishVolume
accessibleTopology, err := parseTopology(req.GetAccessibilityRequirements().GetRequisite(), storageProtocol, &parameters)
if err != nil {
klog.Errorf("parseTopology() returned error: %v", err)
return nil, err
}
klog.V(5).Infof("accessibleTopology: %v", accessibleTopology)

if !common.ValidateName(volumeName) {
return nil, status.Error(codes.InvalidArgument, "volume name contains invalid characters")
}
Expand Down Expand Up @@ -164,11 +153,10 @@ func (controller *Controller) CreateVolume(ctx context.Context, req *csi.CreateV

volume := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeId,
VolumeContext: parameters,
AccessibleTopology: accessibleTopology,
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
ContentSource: req.GetVolumeContentSource(),
VolumeId: volumeId,
VolumeContext: parameters,
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
ContentSource: req.GetVolumeContentSource(),
},
}

Expand Down
124 changes: 41 additions & 83 deletions pkg/controller/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,64 +2,16 @@ package controller

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strings"

"github.com/Seagate/seagate-exos-x-csi/pkg/common"
"github.com/Seagate/seagate-exos-x-csi/pkg/node_service"
pb "github.com/Seagate/seagate-exos-x-csi/pkg/node_service/node_servicepb"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
)

// getConnectorInfoPath
func (driver *Controller) getConnectorInfoPath(volumeID string) string {
return fmt.Sprintf("%s/%s.json", driver.runPath, volumeID)
}

// Read connector json file and return initiator address info for the given volume
func (driver *Controller) readInitiatorMapFromFile(filePath string, volumeID string) ([]string, error) {
klog.Infof("Reading initiator value for volume %v from file %v", volumeID, filePath)
f, err := ioutil.ReadFile(filePath)
if err != nil {
return nil, err
}
initiatorMap := make(map[string][]string)
err = json.Unmarshal(f, &initiatorMap)
if err != nil {
return nil, fmt.Errorf("error unmarshaling initiator info file for specified volume ID %v", volumeID)
}
initiators, found := initiatorMap[volumeID]
if found {
return initiators, nil
} else {
return nil, fmt.Errorf("initiator value for volume ID %v not found", volumeID)
}
}

// PersistConnector persists the provided Connector to the specified file
func persistInitiatorMap(volumeID string, initiators []string, filePath string) error {
initiatorMap := map[string][]string{
volumeID: initiators,
}
f, err := os.Create(filePath)
if err != nil {
klog.Error("error encoding initiator info: %v", err)
return fmt.Errorf("error creating initiator map file %s: %s", filePath, err)
}
defer f.Close()
encoder := json.NewEncoder(f)
if err = encoder.Encode(initiatorMap); err != nil {
klog.Error("error encoding initiator info: %v", err)
return fmt.Errorf("error encoding initiator info: %v", err)
}
klog.Infof("wrote initiator persistence file at %s", filePath)
return nil
}

// ControllerPublishVolume attaches the given volume to the node
func (driver *Controller) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
Expand All @@ -71,35 +23,31 @@ func (driver *Controller) ControllerPublishVolume(ctx context.Context, req *csi.
if req.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "cannot publish volume without capabilities")
}

nodeIP := req.GetNodeId()
parameters := req.GetVolumeContext()

initiatorMapNodeID := common.GetTopologyCompliantNodeID(req.GetNodeId())
var initiatorNames []string
var reqType pb.InitiatorType
switch parameters[common.StorageProtocolKey] {
case common.StorageProtocolSAS:
reqType = pb.InitiatorType_SAS
case common.StorageProtocolFC:
reqType = pb.InitiatorType_FC
case common.StorageProtocolISCSI:
reqType = pb.InitiatorType_ISCSI
}

// Available initiators for the node are provided in parameters through NodeGetInfo
if parameters[common.StorageProtocolKey] == common.StorageProtocolSAS {
for key, val := range parameters {
if strings.Contains(key, common.TopologySASInitiatorLabel) && strings.Contains(key, initiatorMapNodeID) {
initiatorNames = append(initiatorNames, val)
}
}
} else if parameters[common.StorageProtocolKey] == common.StorageProtocolFC {
for key, val := range parameters {
if strings.Contains(key, common.TopologyFCInitiatorLabel) && strings.Contains(key, initiatorMapNodeID) {
initiatorNames = append(initiatorNames, val)
}
}
} else {
initiatorNames = []string{req.GetNodeId()}
initiators, err := node_service.GetNodeInitiators(nodeIP, reqType)
David-T-White marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
klog.ErrorS(err, "error getting node initiators", "node-ip", nodeIP, "storage-protocol", reqType)
return nil, err
}

volumeName, _ := common.VolumeIdGetName(req.GetVolumeId())
persistentInfoFilepath := driver.getConnectorInfoPath(req.GetVolumeId())
persistInitiatorMap(volumeName, initiatorNames, persistentInfoFilepath)

klog.Infof("attach request for initiator(s) %v, volume id: %s", initiatorNames, volumeName)
klog.InfoS("attach request", "initiator(s)", initiators, "volume", volumeName)

lun, err := driver.client.PublishVolume(volumeName, initiatorNames)
lun, err := driver.client.PublishVolume(volumeName, initiators)

if err != nil {
return nil, err
Expand All @@ -119,17 +67,30 @@ func (driver *Controller) ControllerUnpublishVolume(ctx context.Context, req *cs
volumeName, _ := common.VolumeIdGetName(req.GetVolumeId())

var initiators []string
var err error
if protocol, _ := common.VolumeIdGetStorageProtocol(req.GetVolumeId()); protocol == common.StorageProtocolSAS {
initiators, err = driver.readInitiatorMapFromFile(driver.getConnectorInfoPath(req.GetVolumeId()), volumeName)
if err != nil {
return nil, fmt.Errorf("error retrieving initiator! cannot unpublish volume %v", volumeName)
}
} else {
initiators = []string{req.GetNodeId()}

nodeIP := req.GetNodeId()
storageProtocol, err := common.VolumeIdGetStorageProtocol(req.GetVolumeId())
if err != nil {
klog.ErrorS(err, "No storage protocol found in ControllerUnpublishVolume", "storage protocol", storageProtocol, "volume ID:", req.GetVolumeId())
return nil, err
}

var reqType pb.InitiatorType
switch storageProtocol {
case common.StorageProtocolSAS:
reqType = pb.InitiatorType_SAS
case common.StorageProtocolFC:
reqType = pb.InitiatorType_FC
case common.StorageProtocolISCSI:
reqType = pb.InitiatorType_ISCSI
}

klog.Infof("unmapping volume %s from initiator %s", volumeName, initiators)
initiators, err = node_service.GetNodeInitiators(nodeIP, reqType)
if err != nil {
klog.ErrorS(err, "error getting initiators from the node", "nodeIP", nodeIP, "storage-protocol", reqType)
}

klog.InfoS("unmapping volume from initiator", "volumeName", volumeName, "initiators", initiators)
for _, initiator := range initiators {
_, status, err := driver.client.UnmapVolume(volumeName, initiator)
if err != nil {
Expand All @@ -141,9 +102,6 @@ func (driver *Controller) ControllerUnpublishVolume(ctx context.Context, req *cs
}
}

persistentInfoFilepath := driver.getConnectorInfoPath(req.GetVolumeId())
os.Remove(persistentInfoFilepath)

klog.Infof("successfully unmapped volume %s from all initiators", volumeName)
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
54 changes: 21 additions & 33 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/Seagate/csi-lib-iscsi/iscsi"
"github.com/Seagate/seagate-exos-x-csi/pkg/common"
"github.com/Seagate/seagate-exos-x-csi/pkg/node_service"
"github.com/Seagate/seagate-exos-x-csi/pkg/storage"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes/wrappers"
Expand All @@ -26,6 +27,8 @@ type Node struct {

semaphore *semaphore.Weighted
runPath string
nodeName string
nodeIP string
}

// New is a convenience function for creating a node driver
Expand All @@ -34,10 +37,24 @@ func New() *Node {
iscsi.EnableDebugLogging(os.Stderr)
}

envNodeName, _ := os.LookupEnv(common.NodeNameEnvVar)
nodeIP, envFound := os.LookupEnv(common.NodeIPEnvVar)
if !envFound {
klog.InfoS("no Node IP found in environment. Using default")
nodeIP = "127.0.0.1"
}
envServicePort, envFound := os.LookupEnv(common.NodeServicePortEnvVar)
if !envFound {
klog.InfoS("no node service port found in environment. Using default")
envServicePort = "978"
}

node := &Node{
Driver: common.NewDriver(),
semaphore: semaphore.NewWeighted(1),
runPath: fmt.Sprintf("/var/run/%s", common.PluginName),
nodeName: envNodeName,
nodeIP: nodeIP,
}

if err := os.MkdirAll(node.runPath, 0755); err != nil {
Expand Down Expand Up @@ -96,46 +113,17 @@ func New() *Node {
csi.RegisterIdentityServer(node.Server, node)
csi.RegisterNodeServer(node.Server, node)

// initialize node communication service
go node_service.ListenAndServe(envServicePort)

return node
}

// NodeGetInfo returns info about the node
func (node *Node) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
initiatorName, err := readInitiatorName()
if err != nil {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}

topology := map[string]string{}
sasAddresses, err := storage.GetSASInitiators()
if err != nil {
klog.Warningf("Error while searching for FC HBA Addresses: %s", err)
}
fcAddresses, err := storage.GetFCInitiators()
if err != nil {
klog.Warningf("Error while searching for FC HBA Addresses: %s", err)
}

for i, sasAddr := range sasAddresses {
//maximum value length 63 chars
topoKey := fmt.Sprintf("%s/%s-%d", common.TopologyInitiatorPrefix, common.TopologySASInitiatorLabel, i)
topology[topoKey] = sasAddr
}

for i, fcAddr := range fcAddresses {
topoKey := fmt.Sprintf("%s/%s-%d", common.TopologyInitiatorPrefix, common.TopologyFCInitiatorLabel, i)
topology[topoKey] = fcAddr
}

topology[common.TopologyNodeIDKey] = common.GetTopologyCompliantNodeID(initiatorName)

klog.Infof("Node Accessible Topology: %v", topology)
return &csi.NodeGetInfoResponse{
NodeId: initiatorName,
NodeId: node.nodeIP,
David-T-White marked this conversation as resolved.
Show resolved Hide resolved
MaxVolumesPerNode: 255,
AccessibleTopology: &csi.Topology{
Segments: topology,
},
}, nil
}

Expand Down
Loading