From 0a46a31aeb968ad41c80f0c45cacf48605b297a6 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 16 Feb 2022 11:43:00 -0500 Subject: [PATCH] csi: failing tests for concurrency bugs in plugin registration The dynamic plugin registry assumes that plugins are singletons, which matches the behavior of other Nomad plugins. But because dynamic plugins like CSI are implemented by allocations, we need to handle the possibility of multiple allocations for a given plugin type + ID, as well as behaviors around interleaved allocation starts and stops. --- client/dynamicplugins/registry_test.go | 124 ++++++++++ .../pluginmanager/csimanager/manager_test.go | 231 +++++++++++++----- 2 files changed, 297 insertions(+), 58 deletions(-) diff --git a/client/dynamicplugins/registry_test.go b/client/dynamicplugins/registry_test.go index a2621c05f1c..28e626e8cc6 100644 --- a/client/dynamicplugins/registry_test.go +++ b/client/dynamicplugins/registry_test.go @@ -2,6 +2,7 @@ package dynamicplugins import ( "context" + "fmt" "sync" "testing" "time" @@ -221,6 +222,129 @@ func TestDynamicRegistry_StateStore(t *testing.T) { require.NoError(t, err) } +func TestDynamicRegistry_ConcurrentAllocs(t *testing.T) { + + t.Parallel() + dispenseFn := func(i *PluginInfo) (interface{}, error) { + return i, nil + } + + newPlugin := func(idx int) *PluginInfo { + id := fmt.Sprintf("alloc-%d", idx) + return &PluginInfo{ + Name: "my-plugin", + Type: PluginTypeCSINode, + Version: fmt.Sprintf("v%d", idx), + ConnectionInfo: &PluginConnectionInfo{ + SocketPath: "/var/data/alloc/" + id + "/csi.sock"}, + AllocID: id, + } + } + + dispensePlugin := func(t *testing.T, reg Registry) *PluginInfo { + result, err := reg.DispensePlugin(PluginTypeCSINode, "my-plugin") + require.NotNil(t, result) + require.NoError(t, err) + plugin := result.(*PluginInfo) + return plugin + } + + t.Run("restore races on client restart", func(t *testing.T) { + plugin0 := newPlugin(0) + plugin1 := newPlugin(1) + + memdb := &MemDB{} + oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + + // add a plugin and a new alloc running the same plugin + // (without stopping the old one) + require.NoError(t, oldR.RegisterPlugin(plugin0)) + require.NoError(t, oldR.RegisterPlugin(plugin1)) + plugin := dispensePlugin(t, oldR) + require.Equal(t, "alloc-1", plugin.AllocID) + + // client restarts and we load state from disk. + // most recently inserted plugin is current + + newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + plugin = dispensePlugin(t, oldR) + require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-1", plugin.AllocID) + + // RestoreTask fires for all allocations, which runs the + // plugin_supervisor_hook. But there's a race and the allocations + // in this scenario are Restored in the opposite order they were + // created + + require.NoError(t, newR.RegisterPlugin(plugin0)) + plugin = dispensePlugin(t, newR) + // TODO: this currently fails because the RestoreTask races + // between the two allocations and the old plugin is overwritten + require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-1", plugin.AllocID) + }) + + t.Run("replacement races on host restart", func(t *testing.T) { + plugin0 := newPlugin(0) + plugin1 := newPlugin(1) + plugin2 := newPlugin(2) + + memdb := &MemDB{} + oldR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + + // add a plugin and a new alloc running the same plugin + // (without stopping the old one) + require.NoError(t, oldR.RegisterPlugin(plugin0)) + require.NoError(t, oldR.RegisterPlugin(plugin1)) + plugin := dispensePlugin(t, oldR) + require.Equal(t, "alloc-1", plugin.AllocID) + + // client restarts and we load state from disk. + // most recently inserted plugin is current + + newR := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + plugin = dispensePlugin(t, oldR) + require.Equal(t, "/var/data/alloc/alloc-1/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-1", plugin.AllocID) + + // RestoreTask fires for all allocations but none of them are + // running because we restarted the whole host + // + // TODO: csi_hooks fail in this window because we'll send to a + // socket no one is listening on! We won't be able to + // unpublish either! + + // server gives us a replacement alloc + + require.NoError(t, newR.RegisterPlugin(plugin2)) + plugin = dispensePlugin(t, newR) + require.Equal(t, "/var/data/alloc/alloc-2/csi.sock", plugin.ConnectionInfo.SocketPath) + require.Equal(t, "alloc-2", plugin.AllocID) + }) + + t.Run("interleaved register and deregister", func(t *testing.T) { + plugin0 := newPlugin(0) + plugin1 := newPlugin(1) + + memdb := &MemDB{} + reg := NewRegistry(memdb, map[string]PluginDispenser{PluginTypeCSINode: dispenseFn}) + + require.NoError(t, reg.RegisterPlugin(plugin0)) + + // replacement is registered before old plugin deregisters + require.NoError(t, reg.RegisterPlugin(plugin1)) + plugin := dispensePlugin(t, reg) + require.Equal(t, "alloc-1", plugin.AllocID) + + reg.DeregisterPlugin(PluginTypeCSINode, "my-plugin") + // TODO: this currently fails because the Deregister lost the race + // and removes the plugin outright, leaving no running plugin + plugin = dispensePlugin(t, reg) + require.Equal(t, "alloc-1", plugin.AllocID) + }) + +} + // MemDB implements a StateDB that stores data in memory and should only be // used for testing. All methods are safe for concurrent use. This is a // partial implementation of the MemDB in the client/state package, copied diff --git a/client/pluginmanager/csimanager/manager_test.go b/client/pluginmanager/csimanager/manager_test.go index f6c3f381dec..47b8c2a187e 100644 --- a/client/pluginmanager/csimanager/manager_test.go +++ b/client/pluginmanager/csimanager/manager_test.go @@ -1,6 +1,8 @@ package csimanager import ( + "fmt" + "sync" "testing" "time" @@ -13,42 +15,35 @@ import ( var _ pluginmanager.PluginManager = (*csiManager)(nil) -var fakePlugin = &dynamicplugins.PluginInfo{ - Name: "my-plugin", - Type: "csi-controller", - ConnectionInfo: &dynamicplugins.PluginConnectionInfo{}, +func fakePlugin(idx int, pluginType string) *dynamicplugins.PluginInfo { + id := fmt.Sprintf("alloc-%d", idx) + return &dynamicplugins.PluginInfo{ + Name: "my-plugin", + Type: pluginType, + Version: fmt.Sprintf("v%d", idx), + ConnectionInfo: &dynamicplugins.PluginConnectionInfo{ + SocketPath: "/var/data/alloc/" + id + "/csi.sock"}, + AllocID: id, + } } -func setupRegistry() dynamicplugins.Registry { +func setupRegistry(reg *MemDB) dynamicplugins.Registry { return dynamicplugins.NewRegistry( - nil, + reg, map[string]dynamicplugins.PluginDispenser{ - "csi-controller": func(*dynamicplugins.PluginInfo) (interface{}, error) { - return nil, nil + "csi-controller": func(i *dynamicplugins.PluginInfo) (interface{}, error) { + return i, nil + }, + "csi-node": func(i *dynamicplugins.PluginInfo) (interface{}, error) { + return i, nil }, }) } -func TestManager_Setup_Shutdown(t *testing.T) { - r := setupRegistry() - defer r.Shutdown() - - cfg := &Config{ - Logger: testlog.HCLogger(t), - DynamicRegistry: r, - UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, - } - pm := New(cfg).(*csiManager) - pm.Run() - pm.Shutdown() -} - func TestManager_RegisterPlugin(t *testing.T) { - registry := setupRegistry() + registry := setupRegistry(nil) defer registry.Shutdown() - require.NotNil(t, registry) - cfg := &Config{ Logger: testlog.HCLogger(t), DynamicRegistry: registry, @@ -57,30 +52,27 @@ func TestManager_RegisterPlugin(t *testing.T) { pm := New(cfg).(*csiManager) defer pm.Shutdown() - require.NotNil(t, pm.registry) - - err := registry.RegisterPlugin(fakePlugin) - require.Nil(t, err) + plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) + err := registry.RegisterPlugin(plugin) + require.NoError(t, err) pm.Run() require.Eventually(t, func() bool { - pmap, ok := pm.instances[fakePlugin.Type] + pmap, ok := pm.instances[plugin.Type] if !ok { return false } - _, ok = pmap[fakePlugin.Name] + _, ok = pmap[plugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) } func TestManager_DeregisterPlugin(t *testing.T) { - registry := setupRegistry() + registry := setupRegistry(nil) defer registry.Shutdown() - require.NotNil(t, registry) - cfg := &Config{ Logger: testlog.HCLogger(t), DynamicRegistry: registry, @@ -90,23 +82,22 @@ func TestManager_DeregisterPlugin(t *testing.T) { pm := New(cfg).(*csiManager) defer pm.Shutdown() - require.NotNil(t, pm.registry) - - err := registry.RegisterPlugin(fakePlugin) - require.Nil(t, err) + plugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) + err := registry.RegisterPlugin(plugin) + require.NoError(t, err) pm.Run() require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[plugin.Type][plugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) - err = registry.DeregisterPlugin(fakePlugin.Type, fakePlugin.Name) - require.Nil(t, err) + err = registry.DeregisterPlugin(plugin.Type, plugin.Name) + require.NoError(t, err) require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[plugin.Type][plugin.Name] return !ok }, 5*time.Second, 10*time.Millisecond) } @@ -115,11 +106,9 @@ func TestManager_DeregisterPlugin(t *testing.T) { // name but different types (as found with monolith plugins) don't interfere // with each other. func TestManager_MultiplePlugins(t *testing.T) { - registry := setupRegistry() + registry := setupRegistry(nil) defer registry.Shutdown() - require.NotNil(t, registry) - cfg := &Config{ Logger: testlog.HCLogger(t), DynamicRegistry: registry, @@ -129,33 +118,159 @@ func TestManager_MultiplePlugins(t *testing.T) { pm := New(cfg).(*csiManager) defer pm.Shutdown() - require.NotNil(t, pm.registry) - - err := registry.RegisterPlugin(fakePlugin) - require.Nil(t, err) + controllerPlugin := fakePlugin(0, dynamicplugins.PluginTypeCSIController) + err := registry.RegisterPlugin(controllerPlugin) + require.NoError(t, err) - fakeNodePlugin := *fakePlugin - fakeNodePlugin.Type = "csi-node" - err = registry.RegisterPlugin(&fakeNodePlugin) - require.Nil(t, err) + nodePlugin := fakePlugin(0, dynamicplugins.PluginTypeCSINode) + err = registry.RegisterPlugin(nodePlugin) + require.NoError(t, err) pm.Run() require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) require.Eventually(t, func() bool { - _, ok := pm.instances[fakeNodePlugin.Type][fakeNodePlugin.Name] + _, ok := pm.instances[nodePlugin.Type][nodePlugin.Name] return ok }, 5*time.Second, 10*time.Millisecond) - err = registry.DeregisterPlugin(fakePlugin.Type, fakePlugin.Name) - require.Nil(t, err) + err = registry.DeregisterPlugin(controllerPlugin.Type, controllerPlugin.Name) + require.NoError(t, err) require.Eventually(t, func() bool { - _, ok := pm.instances[fakePlugin.Type][fakePlugin.Name] + _, ok := pm.instances[controllerPlugin.Type][controllerPlugin.Name] return !ok }, 5*time.Second, 10*time.Millisecond) } + +// TestManager_ConcurrentPlugins exercises the behavior when multiple +// allocations for the same plugin interact +func TestManager_ConcurrentPlugins(t *testing.T) { + + testManager := func(registry dynamicplugins.Registry) *csiManager { + return New(&Config{ + Logger: testlog.HCLogger(t), + DynamicRegistry: registry, + UpdateNodeCSIInfoFunc: func(string, *structs.CSIInfo) {}, + PluginResyncPeriod: time.Hour, // don't resync except via events + }).(*csiManager) + } + + t.Run("replacement races on host restart", func(t *testing.T) { + plugin0 := fakePlugin(0, dynamicplugins.PluginTypeCSINode) + plugin1 := fakePlugin(1, dynamicplugins.PluginTypeCSINode) + plugin2 := fakePlugin(2, dynamicplugins.PluginTypeCSINode) + + db := &MemDB{} + registry := setupRegistry(db) + pm := testManager(registry) + pm.Run() + + require.NoError(t, registry.RegisterPlugin(plugin0)) + require.NoError(t, registry.RegisterPlugin(plugin1)) + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im.allocID == "alloc-1" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin") + + pm.Shutdown() + registry.Shutdown() + + // client restarts and we load state from disk. + // most recently inserted plugin is current + + registry = setupRegistry(db) + defer registry.Shutdown() + pm = testManager(registry) + defer pm.Shutdown() + pm.Run() + + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im.allocID == "alloc-1" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin was not active after state reload") + + // RestoreTask fires for all allocations but none of them are + // running because we restarted the whole host + // + // TODO: csi_hooks fail in this window because we'll send to a + // socket no one is listening on! We won't be able to + // unpublish either! + + // server gives us a replacement alloc + + require.NoError(t, registry.RegisterPlugin(plugin2)) + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-2/csi.sock" && + im.allocID == "alloc-2" + }, 5*time.Second, 10*time.Millisecond, "alloc-2 plugin was not active after replacement") + + }) + + t.Run("interleaved register and deregister", func(t *testing.T) { + plugin0 := fakePlugin(0, dynamicplugins.PluginTypeCSINode) + plugin1 := fakePlugin(1, dynamicplugins.PluginTypeCSINode) + + db := &MemDB{} + registry := setupRegistry(db) + defer registry.Shutdown() + + pm := testManager(registry) + defer pm.Shutdown() + pm.Run() + + require.NoError(t, registry.RegisterPlugin(plugin0)) + require.NoError(t, registry.RegisterPlugin(plugin1)) + + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" && + im.allocID == "alloc-1" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin did not become active plugin") + + registry.DeregisterPlugin(dynamicplugins.PluginTypeCSINode, "my-plugin") + + // TODO: this currently fails because the Deregister lost the race + // and removes the plugin outright, leaving no running plugin + require.Eventuallyf(t, func() bool { + im, _ := pm.instances[plugin0.Type][plugin0.Name] + return im != nil && + im.info.ConnectionInfo.SocketPath == "/var/data/alloc/alloc-1/csi.sock" + }, 5*time.Second, 10*time.Millisecond, "alloc-1 plugin should still be active plugin") + }) +} + +// MemDB implements a StateDB that stores data in memory and should only be +// used for testing. All methods are safe for concurrent use. This is a +// partial implementation of the MemDB in the client/state package, copied +// here to avoid circular dependencies. +type MemDB struct { + dynamicManagerPs *dynamicplugins.RegistryState + mu sync.RWMutex +} + +func (m *MemDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, error) { + if m == nil { + return nil, nil + } + m.mu.Lock() + defer m.mu.Unlock() + return m.dynamicManagerPs, nil +} + +func (m *MemDB) PutDynamicPluginRegistryState(ps *dynamicplugins.RegistryState) error { + if m == nil { + return nil + } + m.mu.Lock() + defer m.mu.Unlock() + m.dynamicManagerPs = ps + return nil +}