diff --git a/client/dynamicplugins/registry_test.go b/client/dynamicplugins/registry_test.go index a2621c05f1c..28e626e8cc6 100644 --- a/client/dynamicplugins/registry_test.go +++ b/client/dynamicplugins/registry_test.go @@ -2,6 +2,7 @@ package dynamicplugins import ( "context" + "fmt" "sync" "testing" "time" @@ -221,6 +222,129 @@ func TestDynamicRegistry_StateStore(t *testing.T) { require.NoError(t, err) } +func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) { + + t.Parallel() + dispenseFn := func(i *PluginInfo) (interface{}, error) { + return i, nil + } + + newPlugin := func(idx int) *PluginInfo { + id := fmt.Sprintf("alloc-%d", idx) + return &PluginInfo{ + Name: "my-plugin", + Type: PluginTypeCSINode, + Version: fmt.Sprintf("v%d", idx), + ConnectionInfo: &PluginConnectionInfo{ + SocketPath: "/var/data/alloc/" + id + "/csi.sock"}, + AllocID: id, + } + } + + dispensePlugin := func(t *testing.T, reg Registry) *PluginInfo { + result, err := reg.DispensePlugin(PluginTypeCSINode, "my-plugin") + require.NotNil(t, result) + require.NoError(t, err) + plugin := result.(*PluginInfo) + return plugin + } + + t.Run("restore races on client restart", func(t *testing.T) { + plugin0 := newPlugin(0) + plugin1 := newPlugin(1) + + memdb := &MemDB{} + oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + + // add a plugin and a new alloc running the same plugin + // (without stopping the old one) + require.NoError(t, oldR.RegisterPlugin(plugin0)) + require.NoError(t, oldR.RegisterPlugin(plugin1)) + plugin := dispensePlugin(t, oldR) + require.Equal(t, "alloc-1", plugin.AllocID) + + // client restarts and we load state from disk. + // most recently inserted plugin is current + + newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + plugin = dispensePlugin(t, oldR) + require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-1", plugin.AllocID) + + // RestoreTask fires for all allocations, which runs the + // plugin_supervisor_hook. But there's a race and the allocations + // in this scenario are Restored in the opposite order they were + // created + + require.NoError(t, newR.RegisterPlugin(plugin0)) + plugin = dispensePlugin(t, newR) + // TODO: this currently fails because the RestoreTask races + // between the two allocations and the old plugin is overwritten + require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-1", plugin.AllocID) + }) + + t.Run("replacement races on host restart", func(t *testing.T) { + plugin0 := newPlugin(0) + plugin1 := newPlugin(1) + plugin2 := newPlugin(2) + + memdb := &MemDB{} + oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + + // add a plugin and a new alloc running the same plugin + // (without stopping the old one) + require.NoError(t, oldR.RegisterPlugin(plugin0)) + require.NoError(t, oldR.RegisterPlugin(plugin1)) + plugin := dispensePlugin(t, oldR) + require.Equal(t, "alloc-1", plugin.AllocID) + + // client restarts and we load state from disk. + // most recently inserted plugin is current + + newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + plugin = dispensePlugin(t, oldR) + require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-1", plugin.AllocID) + + // RestoreTask fires for all allocations but none of them are + // running because we restarted the whole host + // + // TODO: csi_hooks fail in this window because we'll send to a + // socket no one is listening on! We won't be able to + // unpublish either! + + // server gives us a replacement alloc + + require.NoError(t, newR.RegisterPlugin(plugin2)) + plugin = dispensePlugin(t, newR) + require.Equal(t, "/var/data/alloc/alloc-2/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-2", plugin.AllocID) + }) + + t.Run("interleaved register and deregister", func(t *testing.T) { + plugin0 := newPlugin(0) + plugin1 := newPlugin(1) + + memdb := &MemDB{} + reg := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + + require.NoError(t, reg.RegisterPlugin(plugin0)) + + // replacement is registered before old plugin deregisters + require.NoError(t, reg.RegisterPlugin(plugin1)) + plugin := dispensePlugin(t, reg) + require.Equal(t, "alloc-1", plugin.AllocID) + + reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin") + // TODO: this currently fails because the Deregister lost the race + // and removes the plugin outright, leaving no running plugin + plugin = dispensePlugin(t, reg) + require.Equal(t, "alloc-1", plugin.AllocID) + }) + +} + // MemDB implements a StateDB that stores data in memory and should only be // used for testing. All methods are safe for concurrent use. This is a // partial implementation of the MemDB in the client/state package, copied diff --git a/client/pluginmanager/csimanager/manager_test.go b/client/pluginmanager/csimanager/manager_test.go index f6c3f381dec..47b8c2a187e 100644 --- a/client/pluginmanager/csimanager/manager_test.go +++ b/client/pluginmanager/csimanager/manager_test.go @@ -1,6 +1,8 @@ package csimanager import ( + "fmt" + "sync" "testing" "time" @@ -13,42 +15,35 @@ import ( var _ pluginmanager.PluginManager = (*csiManager)(nil) -var fakePlugin = &dynamicplugins.PluginInfo{ - Name: "my-plugin", - Type: "csi-controller", - ConnectionInfo: &dynamicplugins.PluginConnectionInfo{}, +func fakePlugin(idx int, pluginType string) *dynamicplugins.PluginInfo { + id := fmt.Sprintf("alloc-%d", idx) + return &dynamicplugins.PluginInfo{ + Name: "my-plugin", + Type: pluginType, + Version: fmt.Sprintf("v%d", idx), + ConnectionInfo: &dynamicplugins.PluginConnectionInfo{ + SocketPath: "/var/data/alloc/" + id + "/csi.sock"}, + AllocID: id, + } } -func setupRegistry() dynamicplugins.Registry { +func setupRegistry(reg *MemDB) dynamicplugins.Registry { return dynamicplugins.NewRegistry( - nil, + reg, map[string]dynamicplugins.PluginDispenser{ - "csi-controller": func(*dynamicplugins.PluginInfo) (interface{}, error) { - return nil, nil + "csi-controller": func(i *dynamicplugins.PluginInfo) (interface{}, error) { + return i, nil + }, + "csi-node": func(i *dynamicplugins.PluginInfo) (interface{}, error) { + return i, nil }, }) } -func TestManager_Setup_Shutdown(t *testing.T) { - r := setupRegistry() - defer r.Shutdown() - - cfg := &Config{ - Logger: testlog.HCLogger(t), - DynamicRegistry: r, - UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, - } - pm := New(cfg).(*csiManager) - pm.Run() - pm.Shutdown() -} - func TestManager_RegisterPlugin(t *testing.T) { - registry := setupRegistry() + registry := setupRegistry(nil) defer registry.Shutdown() - require.NotNil(t, registry) - cfg := &Config{ Logger: testlog.HCLogger(t), DynamicRegistry: registry, @@ -57,30 +52,27 @@ func TestManager_RegisterPlugin(t *testing.T) { pm := New(cfg).(*csiManager) defer pm.Shutdown() - require.NotNil(t, pm.registry) - - err := registry.RegisterPlugin(fakePlugin) - require.Nil(t, err) + plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) + err := registry.RegisterPlugin(plugin) + require.NoError(t, err) pm.Run() require.Eventually(t, func() bool { - pmap, ok := pm.instances[fakePlugin.Type] + pmap, ok := pm.instances[plugin.Type] if !ok { return false } - _, ok = pmap[fakePlugin.Name] + _, ok = pmap[plugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) } func TestManager_DeregisterPlugin(t *testing.T) { - registry := setupRegistry() + registry := setupRegistry(nil) defer registry.Shutdown() - require.NotNil(t, registry) - cfg := &Config{ Logger: testlog.HCLogger(t), DynamicRegistry: registry, @@ -90,23 +82,22 @@ func TestManager_DeregisterPlugin(t *testing.T) { pm := New(cfg).(*csiManager) defer pm.Shutdown() - require.NotNil(t, pm.registry) - - err := registry.RegisterPlugin(fakePlugin) - require.Nil(t, err) + plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) + err := registry.RegisterPlugin(plugin) + require.NoError(t, err) pm.Run() require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[plugin.Type][plugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) - err = registry.DeregisterPlugin(fakePlugin.Type, fakePlugin.Name) - require.Nil(t, err) + err = registry.DeregisterPlugin(plugin.Type, plugin.Name) + require.NoError(t, err) require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[plugin.Type][plugin.Name] return !ok }, 5*time.Second, 10*time.Millisecond) } @@ -115,11 +106,9 @@ func TestManager_DeregisterPlugin(t *testing.T) { // name but different types (as found with monolith plugins) don't interfere // with each other. func TestManager_MultiplePlugins(t *testing.T) { - registry := setupRegistry() + registry := setupRegistry(nil) defer registry.Shutdown() - require.NotNil(t, registry) - cfg := &Config{ Logger: testlog.HCLogger(t), DynamicRegistry: registry, @@ -129,33 +118,159 @@ func TestManager_MultiplePlugins(t *testing.T) { pm := New(cfg).(*csiManager) defer pm.Shutdown() - require.NotNil(t, pm.registry) - - err := registry.RegisterPlugin(fakePlugin) - require.Nil(t, err) + controllerPlugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) + err := registry.RegisterPlugin(controllerPlugin) + require.NoError(t, err) - fakeNodePlugin := *fakePlugin - fakeNodePlugin.Type = "csi-node" - err = registry.RegisterPlugin(&fakeNodePlugin) - require.Nil(t, err) + nodePlugin := fakePlugin(0, dynamicplugins.PluginTypeCSINode) + err = registry.RegisterPlugin(nodePlugin) + require.NoError(t, err) pm.Run() require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) require.Eventually(t, func() bool { - _, ok := pm.instances[fakeNodePlugin.Type][fakeNodePlugin.Name] + _, ok := pm.instances[nodePlugin.Type][nodePlugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) - err = registry.DeregisterPlugin(fakePlugin.Type, fakePlugin.Name) - require.Nil(t, err) + err = registry.DeregisterPlugin(controllerPlugin.Type, controllerPlugin.Name) + require.NoError(t, err) require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name] return !ok }, 5*time.Second, 10*time.Millisecond) } + +// TestManager_ConcurrentPlugins exercises the behavior when multiple +// allocations for the same plugin interact +func TestManager_ConcurrentPlugins(t *testing.T) { + + testManager := func(registry dynamicplugins.Registry) *csiManager { + return New(&Config{ + Logger: testlog.HCLogger(t), + DynamicRegistry: registry, + UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, + PluginResyncPeriod: time.Hour, // don't resync except via events + }).(*csiManager) + } + + t.Run("replacement races on host restart", func(t *testing.T) { + plugin0 := fakePlugin(0, dynamicplugins.PluginTypeCSINode) + plugin1 := fakePlugin(1, dynamicplugins.PluginTypeCSINode) + plugin2 := fakePlugin(2, dynamicplugins.PluginTypeCSINode) + + db := &MemDB{} + registry := setupRegistry(db) + pm := testManager(registry) + pm.Run() + + require.NoError(t, registry.RegisterPlugin(plugin0)) + require.NoError(t, registry.RegisterPlugin(plugin1)) + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im.allocID == "alloc-1" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin") + + pm.Shutdown() + registry.Shutdown() + + // client restarts and we load state from disk. + // most recently inserted plugin is current + + registry = setupRegistry(db) + defer registry.Shutdown() + pm = testManager(registry) + defer pm.Shutdown() + pm.Run() + + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im.allocID == "alloc-1" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin was not active after state reload") + + // RestoreTask fires for all allocations but none of them are + // running because we restarted the whole host + // + // TODO: csi_hooks fail in this window because we'll send to a + // socket no one is listening on! We won't be able to + // unpublish either! + + // server gives us a replacement alloc + + require.NoError(t, registry.RegisterPlugin(plugin2)) + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-2/csi.sock" && + im.allocID == "alloc-2" + }, 5*time.Second, 10*time.Millisecond, "alloc-2 plugin was not active after replacement") + + }) + + t.Run("interleaved register and deregister", func(t *testing.T) { + plugin0 := fakePlugin(0, dynamicplugins.PluginTypeCSINode) + plugin1 := fakePlugin(1, dynamicplugins.PluginTypeCSINode) + + db := &MemDB{} + registry := setupRegistry(db) + defer registry.Shutdown() + + pm := testManager(registry) + defer pm.Shutdown() + pm.Run() + + require.NoError(t, registry.RegisterPlugin(plugin0)) + require.NoError(t, registry.RegisterPlugin(plugin1)) + + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im.allocID == "alloc-1" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin") + + registry.DeregisterPlugin(dynamicplugins.PluginTypeCSINode, "my-plugin") + + // TODO: this currently fails because the Deregister lost the race + // and removes the plugin outright, leaving no running plugin + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im != nil && + im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin should still be active plugin") + }) +} + +// MemDB implements a StateDB that stores data in memory and should only be +// used for testing. All methods are safe for concurrent use. This is a +// partial implementation of the MemDB in the client/state package, copied +// here to avoid circular dependencies. +type MemDB struct { + dynamicManagerPs *dynamicplugins.RegistryState + mu sync.RWMutex +} + +func (m *MemDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) { + if m == nil { + return nil, nil + } + m.mu.Lock() + defer m.mu.Unlock() + return m.dynamicManagerPs, nil +} + +func (m *MemDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error { + if m == nil { + return nil + } + m.mu.Lock() + defer m.mu.Unlock() + m.dynamicManagerPs = ps + return nil +}