From 03bd16d7b1d3c7ae9320aac908c1b32a2f4e60ae Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Mon, 26 Feb 2018 16:54:21 -0800 Subject: [PATCH] Make Port Allocator idempotent for GameServers and Node events The port allocator wasn't idempotent for GameServer delete events and Node addition events. This could have resulted in allocated ports being assigned, or having too many ports available as allocatable. --- pkg/gameservers/controller.go | 8 +- pkg/gameservers/portallocator.go | 122 +++++++++++++------- pkg/gameservers/portallocator_test.go | 153 ++++++++++++++++---------- 3 files changed, 181 insertions(+), 102 deletions(-) diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index 0c3d8f0dda..9f74df12a0 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -313,13 +313,11 @@ func (c *Controller) syncGameServerPortAllocationState(gs *stablev1alpha1.GameSe return gs, nil } - gsCopy := gs.DeepCopy() - - port, err := c.portAllocator.Allocate() + gsCopy, err := c.portAllocator.Allocate(gs.DeepCopy()) if err != nil { return gsCopy, errors.Wrapf(err, "error allocating port for GameServer %s", gsCopy.Name) } - gsCopy.Spec.HostPort = port + gsCopy.Status.State = stablev1alpha1.Creating c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Port allocated") @@ -328,7 +326,7 @@ func (c *Controller) syncGameServerPortAllocationState(gs *stablev1alpha1.GameSe if err != nil { // if the GameServer doesn't get updated with the port data, then put the port // back in the pool, as it will get retried on the next pass - c.portAllocator.DeAllocate(port) + c.portAllocator.DeAllocate(gsCopy) return gs, errors.Wrapf(err, "error updating GameServer %s to default values", gs.Name) } diff --git a/pkg/gameservers/portallocator.go b/pkg/gameservers/portallocator.go index 67be8d1a57..d440f4b340 100644 --- a/pkg/gameservers/portallocator.go +++ b/pkg/gameservers/portallocator.go @@ -25,6 +25,7 @@ import ( "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" corelisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -45,6 +46,8 @@ type PortAllocator struct { logger *logrus.Entry mutex sync.RWMutex portAllocations []portAllocation + gameServerRegistry map[types.UID]bool + nodeRegistry map[types.UID]bool minPort int32 maxPort int32 gameServerSynced cache.InformerSynced @@ -70,6 +73,8 @@ func NewPortAllocator(minPort, maxPort int32, mutex: sync.RWMutex{}, minPort: minPort, maxPort: maxPort, + gameServerRegistry: map[types.UID]bool{}, + nodeRegistry: map[types.UID]bool{}, gameServerSynced: gameServers.Informer().HasSynced, gameServerLister: gameServers.Lister(), gameServerInformer: gameServers.Informer(), @@ -79,14 +84,6 @@ func NewPortAllocator(minPort, maxPort int32, } pa.logger = runtime.NewLoggerWithType(pa) - pa.logger.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting") - return pa -} - -// Run sets up the current state of port allocations and -// starts tracking Pod and Node changes -func (pa *PortAllocator) Run(stop <-chan struct{}) error { - pa.logger.Info("Running") pa.gameServerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: pa.syncDeleteGameServer, }) @@ -98,7 +95,7 @@ func (pa *PortAllocator) Run(stop <-chan struct{}) error { oldNode := oldObj.(*corev1.Node) newNode := newObj.(*corev1.Node) if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable { - err := pa.syncPortAllocations(stop) + err := pa.syncPortAllocations() if err != nil { err := errors.Wrap(err, "error resetting ports on node update") runtime.HandleError(pa.logger.WithField("node", newNode), err) @@ -106,7 +103,7 @@ func (pa *PortAllocator) Run(stop <-chan struct{}) error { } }, DeleteFunc: func(obj interface{}) { - err := pa.syncPortAllocations(stop) + err := pa.syncPortAllocations() if err != nil { err := errors.Wrap(err, "error on node deletion") runtime.HandleError(pa.logger.WithField("node", obj), err) @@ -114,45 +111,77 @@ func (pa *PortAllocator) Run(stop <-chan struct{}) error { }, }) - pa.logger.Info("Flush cache sync, before syncing gameserver and node state") + pa.logger.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting") + return pa +} + +// Run sets up the current state of port allocations and +// starts tracking Pod and Node changes (non blocking) +func (pa *PortAllocator) Run(stop <-chan struct{}) error { + pa.logger.Info("Running") + if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) { - return nil + return errors.New("failed to wait for caches to sync") } - return pa.syncPortAllocations(stop) + return pa.syncPortAllocations() } -// Allocate allocates a port. Return ErrPortNotFound if no port is -// allocatable -func (pa *PortAllocator) Allocate() (int32, error) { +// Allocate assigns a port to the GameServer and returns it. +// Return ErrPortNotFound if no port is allocatable +func (pa *PortAllocator) Allocate(gs *v1alpha1.GameServer) (*v1alpha1.GameServer, error) { + if gs.Spec.PortPolicy != v1alpha1.Dynamic { + return gs, errors.Errorf("Port policy of %s is not supported for port allocation", gs.Spec.PortPolicy) + } + pa.mutex.Lock() defer pa.mutex.Unlock() for _, n := range pa.portAllocations { for p, taken := range n { if !taken { n[p] = true - return p, nil + pa.gameServerRegistry[gs.ObjectMeta.UID] = true + gs.Spec.HostPort = p + return gs, nil } } } - return -1, ErrPortNotFound + return gs, ErrPortNotFound } // DeAllocate marks the given port as no longer allocated -func (pa *PortAllocator) DeAllocate(port int32) { +func (pa *PortAllocator) DeAllocate(gs *v1alpha1.GameServer) { + if gs.Spec.PortPolicy != v1alpha1.Dynamic { + return + } + if gs.Spec.HostPort < pa.minPort || gs.Spec.HostPort > pa.maxPort { + return + } + // skip if it wasn't previously allocated + if _, ok := pa.gameServerRegistry[gs.ObjectMeta.UID]; !ok { + pa.logger.WithField("gs", gs.ObjectMeta.Name). + Info("Did not allocate this GameServer. Ignoring for DeAllocation") + return + } pa.mutex.Lock() defer pa.mutex.Unlock() - pa.portAllocations = setPortAllocation(port, pa.portAllocations, false) + pa.portAllocations = setPortAllocation(gs.Spec.HostPort, pa.portAllocations, false) + delete(pa.gameServerRegistry, gs.ObjectMeta.UID) } // syncAddNode adds another node port section // to the available ports func (pa *PortAllocator) syncAddNode(obj interface{}) { - pa.mutex.Lock() - defer pa.mutex.Unlock() - node := obj.(*corev1.Node) + // if we're already added this node, don't do it again + if _, ok := pa.nodeRegistry[node.ObjectMeta.UID]; ok { + pa.logger.WithField("node", node.ObjectMeta.Name).Info("Already added node to port allocations. Skipping") + return + } + pa.logger.WithField("node", node.ObjectMeta.Name).Info("Adding Node to port allocations") + pa.mutex.Lock() + defer pa.mutex.Unlock() ports := portAllocation{} for i := pa.minPort; i <= pa.maxPort; i++ { @@ -160,14 +189,16 @@ func (pa *PortAllocator) syncAddNode(obj interface{}) { } pa.portAllocations = append(pa.portAllocations, ports) + pa.nodeRegistry[node.ObjectMeta.UID] = true } // syncDeleteGameServer when a GameServer Pod is deleted // make the HostPort available func (pa *PortAllocator) syncDeleteGameServer(object interface{}) { - gs := object.(*v1alpha1.GameServer) - pa.logger.WithField("gs", gs).Info("syncing deleted GameServer") - pa.DeAllocate(gs.Spec.HostPort) + if gs, ok := object.(*v1alpha1.GameServer); ok { + pa.logger.WithField("gs", gs).Info("syncing deleted GameServer") + pa.DeAllocate(gs) + } } // syncPortAllocations syncs the pod, node and gameserver caches then @@ -176,39 +207,41 @@ func (pa *PortAllocator) syncDeleteGameServer(object interface{}) { // portAllocations are marked as taken. // Locks the mutex while doing this. // This is basically a stop the world Garbage Collection on port allocations. -func (pa *PortAllocator) syncPortAllocations(stop <-chan struct{}) error { +func (pa *PortAllocator) syncPortAllocations() error { pa.mutex.Lock() defer pa.mutex.Unlock() pa.logger.Info("Resetting Port Allocation") - if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) { - return nil - } - nodes, err := pa.nodeLister.List(labels.Everything()) if err != nil { return errors.Wrap(err, "error listing all nodes") } // setup blank port values - nodePorts := pa.nodePortAllocation(nodes) + nodePorts, nodeRegistry := pa.nodePortAllocation(nodes) gameservers, err := pa.gameServerLister.List(labels.Everything()) if err != nil { return errors.Wrapf(err, "error listing all GameServers") } + gsRegistry := map[types.UID]bool{} + // place to put GameServer port allocations that are not ready yet/after the ready state var nonReadyNodesPorts []int32 // Check GameServers as well, as some for _, gs := range gameservers { - // if the node doesn't exist, it's likely unscheduled - _, ok := nodePorts[gs.Status.NodeName] - if gs.Status.NodeName != "" && ok { - nodePorts[gs.Status.NodeName][gs.Status.Port] = true - } else if gs.Spec.HostPort != 0 { - nonReadyNodesPorts = append(nonReadyNodesPorts, gs.Spec.HostPort) + if gs.Spec.PortPolicy == v1alpha1.Dynamic { + gsRegistry[gs.ObjectMeta.UID] = true + + // if the node doesn't exist, it's likely unscheduled + _, ok := nodePorts[gs.Status.NodeName] + if gs.Status.NodeName != "" && ok { + nodePorts[gs.Status.NodeName][gs.Status.Port] = true + } else if gs.Spec.HostPort != 0 { + nonReadyNodesPorts = append(nonReadyNodesPorts, gs.Spec.HostPort) + } } } @@ -229,15 +262,21 @@ func (pa *PortAllocator) syncPortAllocations(stop <-chan struct{}) error { } pa.portAllocations = allocations + pa.gameServerRegistry = gsRegistry + pa.nodeRegistry = nodeRegistry return nil } // nodePortAllocation returns a map of port allocations all set to being available -// with a map key for each node -func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]portAllocation { +// with a map key for each node, as well as the node registry record (since we're already looping) +func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) (map[string]portAllocation, map[types.UID]bool) { nodePorts := map[string]portAllocation{} + nodeRegistry := map[types.UID]bool{} + for _, n := range nodes { + nodeRegistry[n.ObjectMeta.UID] = true + // ignore unschedulable nodes if !n.Spec.Unschedulable { nodePorts[n.Name] = portAllocation{} @@ -246,7 +285,8 @@ func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]por } } } - return nodePorts + + return nodePorts, nodeRegistry } // setPortAllocation takes a port from an all diff --git a/pkg/gameservers/portallocator_test.go b/pkg/gameservers/portallocator_test.go index cb1bb9031d..d849104006 100644 --- a/pkg/gameservers/portallocator_test.go +++ b/pkg/gameservers/portallocator_test.go @@ -32,13 +32,14 @@ import ( ) var ( - n1 = corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}} - n2 = corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node2"}} - n3 = corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node3"}} + n1 = corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: "node1"}} + n2 = corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node2", UID: "node2"}} + n3 = corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node3", UID: "node3"}} ) func TestPortAllocatorAllocate(t *testing.T) { t.Parallel() + fixture := dynamicGameServerFixture() t.Run("ports are all allocated", func(t *testing.T) { m := newMocks() @@ -61,18 +62,19 @@ func TestPortAllocatorAllocate(t *testing.T) { // ports between 10 and 20 for i := 10; i <= 20; i++ { var p int32 - p, err = pa.Allocate() - assert.True(t, 10 <= p && p <= 20, "%v is not between 10 and 20", p) + gs, err := pa.Allocate(fixture.DeepCopy()) // nolint: vetshadow + assert.True(t, 10 <= gs.Spec.HostPort && gs.Spec.HostPort <= 20, "%v is not between 10 and 20", p) assert.Nil(t, err) } } // now we should have none left - _, err = pa.Allocate() + _, err = pa.Allocate(fixture.DeepCopy()) assert.Equal(t, ErrPortNotFound, err) }) t.Run("ports are unique in a node", func(t *testing.T) { + fixture := dynamicGameServerFixture() m := newMocks() pa := NewPortAllocator(10, 20, m.kubeInformationFactory, m.agonesInformerFactory) @@ -86,15 +88,16 @@ func TestPortAllocatorAllocate(t *testing.T) { assert.Nil(t, err) var ports []int32 for i := 10; i <= 20; i++ { - p, err := pa.Allocate() + gs, err := pa.Allocate(fixture.DeepCopy()) assert.Nil(t, err) - assert.NotContains(t, ports, p) - ports = append(ports, p) + assert.NotContains(t, ports, gs.Spec.HostPort) + ports = append(ports, gs.Spec.HostPort) } }) } func TestPortAllocatorMultithreadAllocate(t *testing.T) { + fixture := dynamicGameServerFixture() m := newMocks() pa := NewPortAllocator(10, 110, m.kubeInformationFactory, m.agonesInformerFactory) @@ -113,7 +116,8 @@ func TestPortAllocatorMultithreadAllocate(t *testing.T) { go func(i int) { for x := 0; x < 10; x++ { logrus.WithField("x", x).WithField("i", i).Info("allocating!") - _, err := pa.Allocate() + gs, err := pa.Allocate(fixture.DeepCopy()) + assert.NotEmpty(t, gs.Spec.HostPort) assert.Nil(t, err) } wg.Done() @@ -126,6 +130,7 @@ func TestPortAllocatorMultithreadAllocate(t *testing.T) { func TestPortAllocatorDeAllocate(t *testing.T) { t.Parallel() + fixture := dynamicGameServerFixture() m := newMocks() pa := NewPortAllocator(10, 20, m.kubeInformationFactory, m.agonesInformerFactory) nodes := []corev1.Node{n1, n2, n3} @@ -138,13 +143,25 @@ func TestPortAllocatorDeAllocate(t *testing.T) { err := pa.Run(stop) assert.Nil(t, err) - port, err := pa.Allocate() - assert.Nil(t, err) - assert.True(t, port >= 10) - assert.Equal(t, 1, countAllocatedPorts(pa, port)) - - pa.DeAllocate(port) - assert.Equal(t, 0, countAllocatedPorts(pa, port)) + for i := 0; i <= 100; i++ { + gs, err := pa.Allocate(fixture.DeepCopy()) + assert.Nil(t, err) + assert.True(t, 10 <= gs.Spec.HostPort && gs.Spec.HostPort <= 20) + assert.Equal(t, 1, countAllocatedPorts(pa, gs.Spec.HostPort)) + assert.Len(t, pa.gameServerRegistry, 1) + + // test a non allocated + nonAllocatedGS := gs.DeepCopy() + nonAllocatedGS.ObjectMeta.Name = "no" + nonAllocatedGS.ObjectMeta.UID = "no" + pa.DeAllocate(nonAllocatedGS) + assert.Equal(t, 1, countAllocatedPorts(pa, gs.Spec.HostPort)) + assert.Len(t, pa.gameServerRegistry, 1) + + pa.DeAllocate(gs) + assert.Equal(t, 0, countAllocatedPorts(pa, gs.Spec.HostPort)) + assert.Len(t, pa.gameServerRegistry, 0) + } } func TestPortAllocatorSyncPortAllocations(t *testing.T) { @@ -159,25 +176,36 @@ func TestPortAllocatorSyncPortAllocations(t *testing.T) { }) m.agonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { - gs1 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1"}, Spec: v1alpha1.GameServerSpec{HostPort: 10}, + gs1 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", UID: "1"}, + Spec: v1alpha1.GameServerSpec{PortPolicy: v1alpha1.Dynamic, HostPort: 10}, Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 10, NodeName: n1.ObjectMeta.Name}} - gs2 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2"}, Spec: v1alpha1.GameServerSpec{HostPort: 10}, + gs2 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2", UID: "2"}, + Spec: v1alpha1.GameServerSpec{PortPolicy: v1alpha1.Dynamic, HostPort: 10}, Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 10, NodeName: n2.ObjectMeta.Name}} - gs3 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs3"}, Spec: v1alpha1.GameServerSpec{HostPort: 11}, + gs3 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs3", UID: "3"}, + Spec: v1alpha1.GameServerSpec{PortPolicy: v1alpha1.Dynamic, HostPort: 11}, Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 11, NodeName: n3.ObjectMeta.Name}} - gs4 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs4"}, Spec: v1alpha1.GameServerSpec{HostPort: 12}, + gs4 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs4", UID: "4"}, + Spec: v1alpha1.GameServerSpec{PortPolicy: v1alpha1.Dynamic, HostPort: 12}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Creating}} + gs5 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs5", UID: "5"}, + Spec: v1alpha1.GameServerSpec{PortPolicy: v1alpha1.Dynamic, HostPort: 12}, Status: v1alpha1.GameServerStatus{State: v1alpha1.Creating}} - gs5 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs5"}, Spec: v1alpha1.GameServerSpec{HostPort: 12}, + gs6 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs6", UID: "6"}, + Spec: v1alpha1.GameServerSpec{PortPolicy: v1alpha1.Static, HostPort: 12}, Status: v1alpha1.GameServerStatus{State: v1alpha1.Creating}} - gsl := &v1alpha1.GameServerList{Items: []v1alpha1.GameServer{gs1, gs2, gs3, gs4, gs5}} + gsl := &v1alpha1.GameServerList{Items: []v1alpha1.GameServer{gs1, gs2, gs3, gs4, gs5, gs6}} return true, gsl, nil }) stop, cancel := startInformers(m) defer cancel() - err := pa.syncPortAllocations(stop) + + err := pa.Run(stop) + assert.Nil(t, err) assert.Len(t, pa.portAllocations, 3) + assert.Len(t, pa.gameServerRegistry, 5) // count the number of allocated ports, assert.Equal(t, 2, countAllocatedPorts(pa, 10)) @@ -195,12 +223,21 @@ func TestPortAllocatorSyncDeleteGameServer(t *testing.T) { t.Parallel() m := newMocks() - fakeWatch := watch.NewFake() - m.agonesClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(fakeWatch, nil)) + gsWatch := watch.NewFake() + m.agonesClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(gsWatch, nil)) - gs1Fixture := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs4"}, Spec: v1alpha1.GameServerSpec{HostPort: 10}} - gs2Fixture := gs1Fixture.DeepCopy() - gs2Fixture.ObjectMeta.Name = "gs5" + gs1 := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", UID: "1"}, + Spec: v1alpha1.GameServerSpec{PortPolicy: v1alpha1.Dynamic, HostPort: 10}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 10, NodeName: n1.ObjectMeta.Name}} + gs2 := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2", UID: "2"}, + Spec: v1alpha1.GameServerSpec{PortPolicy: v1alpha1.Dynamic, HostPort: 11}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 11, NodeName: n1.ObjectMeta.Name}} + gs3 := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs3", UID: "3"}, + Spec: v1alpha1.GameServerSpec{PortPolicy: v1alpha1.Dynamic, HostPort: 10}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 10, NodeName: n2.ObjectMeta.Name}} + gs4 := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs4", UID: "4"}, + Spec: v1alpha1.GameServerSpec{PortPolicy: v1alpha1.Dynamic, HostPort: 10}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 10, NodeName: n2.ObjectMeta.Name}} pa := NewPortAllocator(10, 20, m.kubeInformationFactory, m.agonesInformerFactory) @@ -209,40 +246,24 @@ func TestPortAllocatorSyncDeleteGameServer(t *testing.T) { return true, nl, nil }) - m.agonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { - gs1 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1"}, Spec: v1alpha1.GameServerSpec{HostPort: 10}, - Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 10, NodeName: n1.ObjectMeta.Name}} - gs2 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2"}, Spec: v1alpha1.GameServerSpec{HostPort: 11}, - Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 11, NodeName: n1.ObjectMeta.Name}} - gs3 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs3"}, Spec: v1alpha1.GameServerSpec{HostPort: 10}, - Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 10, NodeName: n2.ObjectMeta.Name}} - - gsl := &v1alpha1.GameServerList{Items: []v1alpha1.GameServer{gs1, gs2, gs3}} - return true, gsl, nil - }) - stop, cancel := startInformers(m) defer cancel() - // this should do nothing, as it's before pa.Created is called - fakeWatch.Add(gs2Fixture.DeepCopy()) - fakeWatch.Delete(gs2Fixture.DeepCopy()) + gsWatch.Add(gs1.DeepCopy()) + gsWatch.Add(gs2.DeepCopy()) + gsWatch.Add(gs3.DeepCopy()) err := pa.Run(stop) assert.Nil(t, err) - nonGSPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "notagameserver"}} - fakeWatch.Add(gs1Fixture.DeepCopy()) - fakeWatch.Add(nonGSPod.DeepCopy()) - assert.True(t, cache.WaitForCacheSync(stop, pa.gameServerSynced)) - // gate pa.mutex.RLock() // reading mutable state, so read lock assert.Equal(t, 2, countAllocatedPorts(pa, 10)) assert.Equal(t, 1, countAllocatedPorts(pa, 11)) pa.mutex.RUnlock() - fakeWatch.Delete(gs1Fixture.DeepCopy()) + // delete allocated gs + gsWatch.Delete(gs3.DeepCopy()) assert.True(t, cache.WaitForCacheSync(stop, pa.gameServerSynced)) pa.mutex.RLock() // reading mutable state, so read lock @@ -250,8 +271,9 @@ func TestPortAllocatorSyncDeleteGameServer(t *testing.T) { assert.Equal(t, 1, countAllocatedPorts(pa, 11)) pa.mutex.RUnlock() - // delete the non gameserver pod, all should be the same - fakeWatch.Delete(nonGSPod.DeepCopy()) + // delete the currently non allocated server, all should be the same + // simulated getting an old delete message + gsWatch.Delete(gs4.DeepCopy()) assert.True(t, cache.WaitForCacheSync(stop, pa.gameServerSynced)) pa.mutex.RLock() // reading mutable state, so read lock assert.Equal(t, 1, countAllocatedPorts(pa, 10)) @@ -260,6 +282,7 @@ func TestPortAllocatorSyncDeleteGameServer(t *testing.T) { } func TestPortAllocatorNodeEvents(t *testing.T) { + fixture := dynamicGameServerFixture() m := newMocks() pa := NewPortAllocator(10, 20, m.kubeInformationFactory, m.agonesInformerFactory) nodeWatch := watch.NewFake() @@ -278,10 +301,11 @@ func TestPortAllocatorNodeEvents(t *testing.T) { assert.Nil(t, err) // add a game server - port, err := pa.Allocate() + gs, err := pa.Allocate(fixture.DeepCopy()) + port := gs.Spec.HostPort + assert.Nil(t, err) - gs := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1"}, Spec: v1alpha1.GameServerSpec{HostPort: port}} - gsWatch.Add(&gs) + gsWatch.Add(gs) pa.mutex.RLock() assert.Len(t, pa.portAllocations, 2) @@ -329,6 +353,13 @@ func TestPortAllocatorNodeEvents(t *testing.T) { assert.Len(t, pa.portAllocations, 2) assert.Equal(t, 1, countAllocatedPorts(pa, port)) pa.mutex.RUnlock() + + // add the n1 node again, it shouldn't do anything + nodeWatch.Add(&n1) + pa.mutex.RLock() + assert.Len(t, pa.portAllocations, 2) + assert.Equal(t, 1, countAllocatedPorts(pa, port)) + pa.mutex.RUnlock() } func TestNodePortAllocation(t *testing.T) { @@ -341,7 +372,8 @@ func TestNodePortAllocation(t *testing.T) { nl := &corev1.NodeList{Items: nodes} return true, nl, nil }) - result := pa.nodePortAllocation([]*corev1.Node{&n1, &n2, &n3}) + result, registry := pa.nodePortAllocation([]*corev1.Node{&n1, &n2, &n3}) + assert.Len(t, registry, 3) assert.Len(t, result, 3) for _, n := range nodes { ports, ok := result[n.ObjectMeta.Name] @@ -369,6 +401,15 @@ func TestTakePortAllocation(t *testing.T) { } } +func dynamicGameServerFixture() *v1alpha1.GameServer { + return &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default", UID: "1234"}, + Spec: v1alpha1.GameServerSpec{ + ContainerPort: 7777, + PortPolicy: v1alpha1.Dynamic, + }, + } +} + // countAllocatedPorts counts how many of a given port have been // allocated across nodes func countAllocatedPorts(pa *PortAllocator, p int32) int {