Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge CSI/Cluster Volumes code into Master #3022

Merged
merged 44 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
345a232
Add protos for cluster volumes
dperny May 15, 2020
3a1f960
Add foundational Volumes code.
dperny May 15, 2020
b0a1aa6
Add code for creating volumes
dperny Jul 20, 2020
ae4642e
Rename CSI volumes package
dperny Jul 27, 2020
3269992
Add implementation for NodeGetInfo
ameyag Jul 22, 2020
5e1b5a7
Add foundational test code for node agent
ameyag Jul 27, 2020
7c03852
Add CSI info test for agent
ameyag Jul 30, 2020
725e4f5
Add repeated VolumeAttachment to Task object
dperny Aug 19, 2020
c1163fa
Add node inventory tracking to CSI manager
dperny Jul 29, 2020
6dd79ba
Add code for determining if a volume is available
dperny Aug 10, 2020
7fc9dcc
Add volumeSet to scheduler
dperny Aug 26, 2020
887a874
Add scheduler volumes tests
dperny Sep 10, 2020
c95ca6d
Add basic dispatcher handling of Volumes
dperny Aug 27, 2020
e50be7c
Add code for preparing to publish volumes to nodes
dperny Sep 16, 2020
496ae49
Delay dispatching VolumeAssignment until ready
dperny Sep 29, 2020
725ebcc
Handle VolumeAssignment from dispatcher
ameyag Aug 6, 2020
0edfaa0
Update agent-side volume handling
dperny Oct 1, 2020
c43b931
Update volume dispatcher workflow
dperny Oct 5, 2020
ae520fd
Add freeing unused volumes
dperny Oct 7, 2020
3fd9b06
Add API fields for clean removal.
dperny Sep 14, 2020
251815c
Add VolumeEnforcer and respect VolumeAvailability
dperny Oct 13, 2020
89c54aa
Add AccessMode to VolumeAssignment
dperny Oct 22, 2020
860e4e1
Add removal and retrying volume operations
dperny Oct 15, 2020
19dd6c4
Add support for NodeGetCapabilities and foundation for RPC call
ameyag Nov 4, 2020
ebe7c69
Add rpc call
ameyag Nov 6, 2020
27dc3cc
Disable integration test for CSI plugin
ameyag Nov 16, 2020
f3fc22f
Use filepath instead of fmt for evaluating path
ameyag Nov 16, 2020
61dbf7f
Split plugin configuration and handle sending plugins
dperny Nov 4, 2020
a965651
Update agent csi package to pass tests.
dperny Nov 18, 2020
bcdebe6
Report volumes unpublished from agent
dperny Nov 25, 2020
512665f
Further refactor agent for removing volumes
dperny Dec 4, 2020
31632f2
Add support for volume secrets
dperny Dec 14, 2020
b12645f
Add plugin calls to RPCs
dperny Oct 28, 2020
e30e043
Add initialization logic to CSI manager
dperny Oct 27, 2020
03472df
enable volumes on manager
dperny Dec 15, 2020
b7ba512
Create gRPC connection in csi.Plugin
dperny Nov 10, 2020
3016f77
add fixes from testing in engine
dperny Dec 21, 2020
2960658
Add volume access type (mount or block) to spec
dperny Feb 22, 2021
d0176b4
Use PluginGetter instead of Cluster CSIConfig
dperny Jun 23, 2021
e1040b0
Use common fake PluginGetter
dperny Jun 30, 2021
cca3615
Fix tests
dperny Jul 1, 2021
2295a13
Merge pull request #3016 from dperny/feature-volumes-managed-plugin
dperny Jul 6, 2021
fb51b39
Fix template tests
dperny Jul 27, 2021
24aaa54
Merge pull request #3023 from dperny/feature-volumes-fix-template-tests
dperny Jul 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
36 changes: 35 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,40 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api
}
}

// ReportVolumeUnpublished sends a Volume status update to the manager
// indicating that the provided volume has been successfully unpublished.
func (a *Agent) ReportVolumeUnpublished(ctx context.Context, volumeID string) error {
l := log.G(ctx).WithField("volume.ID", volumeID)
l.Debug("(*Agent).ReportVolumeUnpublished")
ctx, cancel := context.WithCancel(ctx)
defer cancel()

errs := make(chan error, 1)
if err := a.withSession(ctx, func(session *session) error {
go func() {
err := session.reportVolumeUnpublished(ctx, []string{volumeID})
if err != nil {
l.WithError(err).Error("error reporting volume unpublished")
} else {
l.Debug("reported volume unpublished")
}

errs <- err
}()

return nil
}); err != nil {
return err
}

select {
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
}
}

// Publisher returns a LogPublisher for the given subscription
// as well as a cancel function that should be called when the log stream
// is completed.
Expand Down Expand Up @@ -597,8 +631,8 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP
func (a *Agent) nodeDescriptionWithHostname(ctx context.Context, tlsInfo *api.NodeTLSInfo) (*api.NodeDescription, error) {
desc, err := a.config.Executor.Describe(ctx)

// Override hostname and TLS info
if desc != nil {
// Override hostname and TLS info
if a.config.Hostname != "" {
desc.Hostname = a.config.Hostname
}
Expand Down
121 changes: 121 additions & 0 deletions agent/csi/plugin/client_fake_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package plugin

import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/container-storage-interface/spec/lib/go/csi"
)

type fakeNodeClient struct {
// stagedVolumes is a set of all volume IDs for which NodeStageVolume has been
// called on this fake
stagedVolumes map[string]struct{}

// getInfoRequests is a log of all requests to NodeGetInfo.
getInfoRequests []*csi.NodeGetInfoRequest
// stageVolumeRequests is a log of all requests to NodeStageVolume.
stageVolumeRequests []*csi.NodeStageVolumeRequest
// unstageVolumeRequests is a log of all requests to NodeUnstageVolume.
unstageVolumeRequests []*csi.NodeUnstageVolumeRequest
// publishVolumeRequests is a log of all requests to NodePublishVolume.
publishVolumeRequests []*csi.NodePublishVolumeRequest
// unpublishVolumeRequests is a log of all requests to NodeUnpublishVolume.
unpublishVolumeRequests []*csi.NodeUnpublishVolumeRequest
// getCapabilitiesRequests is a log of all requests to NodeGetInfo.
getCapabilitiesRequests []*csi.NodeGetCapabilitiesRequest
// idCounter is a simple way to generate ids
idCounter int
// isStaging indicates if plugin supports stage/unstage capability
isStaging bool
// node ID is identifier for the node.
nodeID string
}

func newFakeNodeClient(isStaging bool, nodeID string) *fakeNodeClient {
return &fakeNodeClient{
stagedVolumes: map[string]struct{}{},
getInfoRequests: []*csi.NodeGetInfoRequest{},
stageVolumeRequests: []*csi.NodeStageVolumeRequest{},
unstageVolumeRequests: []*csi.NodeUnstageVolumeRequest{},
publishVolumeRequests: []*csi.NodePublishVolumeRequest{},
unpublishVolumeRequests: []*csi.NodeUnpublishVolumeRequest{},
getCapabilitiesRequests: []*csi.NodeGetCapabilitiesRequest{},
isStaging: isStaging,
nodeID: nodeID,
}
}

func (f *fakeNodeClient) NodeGetInfo(ctx context.Context, in *csi.NodeGetInfoRequest, _ ...grpc.CallOption) (*csi.NodeGetInfoResponse, error) {

f.idCounter++
f.getInfoRequests = append(f.getInfoRequests, in)
return &csi.NodeGetInfoResponse{
NodeId: f.nodeID,
}, nil
}

func (f *fakeNodeClient) NodeStageVolume(ctx context.Context, in *csi.NodeStageVolumeRequest, opts ...grpc.CallOption) (*csi.NodeStageVolumeResponse, error) {
f.idCounter++
f.stageVolumeRequests = append(f.stageVolumeRequests, in)
f.stagedVolumes[in.VolumeId] = struct{}{}

return &csi.NodeStageVolumeResponse{}, nil
}

func (f *fakeNodeClient) NodeUnstageVolume(ctx context.Context, in *csi.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csi.NodeUnstageVolumeResponse, error) {
f.idCounter++
f.unstageVolumeRequests = append(f.unstageVolumeRequests, in)

if _, ok := f.stagedVolumes[in.VolumeId]; !ok {
return nil, status.Error(codes.FailedPrecondition, "can't unstage volume that is not already staged")
}

delete(f.stagedVolumes, in.VolumeId)

return &csi.NodeUnstageVolumeResponse{}, nil
}

func (f *fakeNodeClient) NodePublishVolume(ctx context.Context, in *csi.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csi.NodePublishVolumeResponse, error) {
f.idCounter++
f.publishVolumeRequests = append(f.publishVolumeRequests, in)

return &csi.NodePublishVolumeResponse{}, nil
}

func (f *fakeNodeClient) NodeUnpublishVolume(ctx context.Context, in *csi.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csi.NodeUnpublishVolumeResponse, error) {
f.idCounter++
f.unpublishVolumeRequests = append(f.unpublishVolumeRequests, in)
return &csi.NodeUnpublishVolumeResponse{}, nil

}

func (f *fakeNodeClient) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest, opts ...grpc.CallOption) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, nil
}

func (f *fakeNodeClient) NodeExpandVolume(ctx context.Context, in *csi.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csi.NodeExpandVolumeResponse, error) {
return nil, nil
}

func (f *fakeNodeClient) NodeGetCapabilities(ctx context.Context, in *csi.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csi.NodeGetCapabilitiesResponse, error) {
f.idCounter++
f.getCapabilitiesRequests = append(f.getCapabilitiesRequests, in)
if f.isStaging {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
},
}, nil
}
return &csi.NodeGetCapabilitiesResponse{}, nil
}
119 changes: 119 additions & 0 deletions agent/csi/plugin/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package plugin

import (
"context"
"fmt"
"sync"

"github.com/docker/docker/pkg/plugingetter"

"github.com/docker/swarmkit/api"
)

const (
// DockerCSIPluginCap is the capability name of the plugins we use with the
// PluginGetter to get only the plugins we need. The full name of the
// plugin interface is "swarm.csiplugin/1.0"
DockerCSIPluginCap = "csiplugin"
)

// PluginManager manages the multiple CSI plugins that may be in use on the
// node. PluginManager should be thread-safe.
type PluginManager interface {
// Get gets the plugin with the given name
Get(name string) (NodePlugin, error)

// NodeInfo returns the NodeCSIInfo for every active plugin.
NodeInfo(ctx context.Context) ([]*api.NodeCSIInfo, error)
}

type pluginManager struct {
plugins map[string]NodePlugin
pluginsMu sync.Mutex

// newNodePluginFunc usually points to NewNodePlugin. However, for testing,
// NewNodePlugin can be swapped out with a function that creates fake node
// plugins
newNodePluginFunc func(string, plugingetter.CompatPlugin, plugingetter.PluginAddr, SecretGetter) NodePlugin

// secrets is a SecretGetter for use by node plugins.
secrets SecretGetter

pg plugingetter.PluginGetter
}

func NewPluginManager(pg plugingetter.PluginGetter, secrets SecretGetter) PluginManager {
return &pluginManager{
plugins: map[string]NodePlugin{},
newNodePluginFunc: NewNodePlugin,
secrets: secrets,
pg: pg,
}
}

func (pm *pluginManager) Get(name string) (NodePlugin, error) {
pm.pluginsMu.Lock()
defer pm.pluginsMu.Unlock()

plugin, err := pm.getPlugin(name)
if err != nil {
return nil, fmt.Errorf("cannot get plugin %v: %v", name, err)
}

return plugin, nil
}

func (pm *pluginManager) NodeInfo(ctx context.Context) ([]*api.NodeCSIInfo, error) {
// TODO(dperny): do not acquire this lock for the duration of the the
// function call. that's too long and too blocking.
pm.pluginsMu.Lock()
defer pm.pluginsMu.Unlock()

// first, we should make sure all of the plugins are initialized. do this
// by looking up all the current plugins with DockerCSIPluginCap.
plugins := pm.pg.GetAllManagedPluginsByCap(DockerCSIPluginCap)
for _, plugin := range plugins {
// TODO(dperny): use this opportunity to drop plugins that we're
// tracking but which no longer exist.

// we don't actually need the plugin returned, we just need it loaded
// as a side effect.
pm.getPlugin(plugin.Name())
}

nodeInfo := []*api.NodeCSIInfo{}
for _, plugin := range pm.plugins {
info, err := plugin.NodeGetInfo(ctx)
if err != nil {
// skip any plugin that returns an error
continue
}

nodeInfo = append(nodeInfo, info)
}
return nodeInfo, nil
}

// getPlugin looks up the plugin with the specified name. Loads the plugin if
// not yet loaded.
//
// pm.pluginsMu must be obtained before calling this method.
func (pm *pluginManager) getPlugin(name string) (NodePlugin, error) {
if p, ok := pm.plugins[name]; ok {
return p, nil
}

pc, err := pm.pg.Get(name, DockerCSIPluginCap, plugingetter.Lookup)
if err != nil {
return nil, err
}

pa, ok := pc.(plugingetter.PluginAddr)
if !ok {
return nil, fmt.Errorf("plugin does not implement PluginAddr interface")
}

p := pm.newNodePluginFunc(name, pc, pa, pm.secrets)
pm.plugins[name] = p
return p, nil
}
80 changes: 80 additions & 0 deletions agent/csi/plugin/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package plugin

import (
"context"
"net"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/docker/swarmkit/testutils"
)

var _ = Describe("PluginManager", func() {
var (
pm *pluginManager
pg *testutils.FakePluginGetter
)

BeforeEach(func() {
pg = &testutils.FakePluginGetter{
Plugins: map[string]*testutils.FakeCompatPlugin{},
}

pm = &pluginManager{
plugins: map[string]NodePlugin{},
newNodePluginFunc: newFakeNodePlugin,
pg: pg,
}

pg.Plugins["plug1"] = &testutils.FakeCompatPlugin{
PluginName: "plug1",
PluginAddr: &net.UnixAddr{
Net: "unix",
Name: "",
},
}
pg.Plugins["plug2"] = &testutils.FakeCompatPlugin{
PluginName: "plug2",
PluginAddr: &net.UnixAddr{
Net: "unix",
Name: "fail",
},
}
pg.Plugins["plug3"] = &testutils.FakeCompatPlugin{
PluginName: "plug3",
PluginAddr: &net.UnixAddr{
Net: "unix",
Name: "",
},
}
})

Describe("Get", func() {
It("should return the requested plugin", func() {
p, err := pm.Get("plug1")
Expect(err).ToNot(HaveOccurred())
Expect(p).ToNot(BeNil())
})

It("should return an error if no plugin can be found", func() {
p, err := pm.Get("plugNotHere")
Expect(err).To(HaveOccurred())
Expect(p).To(BeNil())
})
})

Describe("NodeInfo", func() {
It("should return NodeCSIInfo for every active plugin", func() {
info, err := pm.NodeInfo(context.Background())
Expect(err).ToNot(HaveOccurred())

pluginNames := []string{}
for _, i := range info {
pluginNames = append(pluginNames, i.PluginName)
}

Expect(pluginNames).To(ConsistOf("plug1", "plug3"))
})
})
})
Loading