Skip to content

Commit

Permalink
Make Port Allocator idempotent for GameServers and Node events
Browse files Browse the repository at this point in the history
The port allocator wasn't idempotent for GameServer delete events
and Node addition events. This could have resulted in
allocated ports being assigned, or having too many ports available
as allocatable.
  • Loading branch information
markmandel committed Feb 27, 2018
1 parent 86895cc commit c2bafcc
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 102 deletions.
8 changes: 3 additions & 5 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,11 @@ func (c *Controller) syncGameServerPortAllocationState(gs *stablev1alpha1.GameSe
return gs, nil
}

gsCopy := gs.DeepCopy()

port, err := c.portAllocator.Allocate()
gsCopy, err := c.portAllocator.Allocate(gs.DeepCopy())
if err != nil {
return gsCopy, errors.Wrapf(err, "error allocating port for GameServer %s", gsCopy.Name)
}
gsCopy.Spec.HostPort = port

gsCopy.Status.State = stablev1alpha1.Creating
c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Port allocated")

Expand All @@ -328,7 +326,7 @@ func (c *Controller) syncGameServerPortAllocationState(gs *stablev1alpha1.GameSe
if err != nil {
// if the GameServer doesn't get updated with the port data, then put the port
// back in the pool, as it will get retried on the next pass
c.portAllocator.DeAllocate(port)
c.portAllocator.DeAllocate(gsCopy)
return gs, errors.Wrapf(err, "error updating GameServer %s to default values", gs.Name)
}

Expand Down
122 changes: 81 additions & 41 deletions pkg/gameservers/portallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
corelisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -45,6 +46,8 @@ type PortAllocator struct {
logger *logrus.Entry
mutex sync.RWMutex
portAllocations []portAllocation
gameServerRegistry map[types.UID]bool
nodeRegistry map[types.UID]bool
minPort int32
maxPort int32
gameServerSynced cache.InformerSynced
Expand All @@ -70,6 +73,8 @@ func NewPortAllocator(minPort, maxPort int32,
mutex: sync.RWMutex{},
minPort: minPort,
maxPort: maxPort,
gameServerRegistry: map[types.UID]bool{},
nodeRegistry: map[types.UID]bool{},
gameServerSynced: gameServers.Informer().HasSynced,
gameServerLister: gameServers.Lister(),
gameServerInformer: gameServers.Informer(),
Expand All @@ -79,14 +84,6 @@ func NewPortAllocator(minPort, maxPort int32,
}
pa.logger = runtime.NewLoggerWithType(pa)

pa.logger.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting")
return pa
}

// Run sets up the current state of port allocations and
// starts tracking Pod and Node changes
func (pa *PortAllocator) Run(stop <-chan struct{}) error {
pa.logger.Info("Running")
pa.gameServerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: pa.syncDeleteGameServer,
})
Expand All @@ -98,76 +95,110 @@ func (pa *PortAllocator) Run(stop <-chan struct{}) error {
oldNode := oldObj.(*corev1.Node)
newNode := newObj.(*corev1.Node)
if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable {
err := pa.syncPortAllocations(stop)
err := pa.syncPortAllocations()
if err != nil {
err := errors.Wrap(err, "error resetting ports on node update")
runtime.HandleError(pa.logger.WithField("node", newNode), err)
}
}
},
DeleteFunc: func(obj interface{}) {
err := pa.syncPortAllocations(stop)
err := pa.syncPortAllocations()
if err != nil {
err := errors.Wrap(err, "error on node deletion")
runtime.HandleError(pa.logger.WithField("node", obj), err)
}
},
})

pa.logger.Info("Flush cache sync, before syncing gameserver and node state")
pa.logger.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting")
return pa
}

// Run sets up the current state of port allocations and
// starts tracking Pod and Node changes (non blocking)
func (pa *PortAllocator) Run(stop <-chan struct{}) error {
pa.logger.Info("Running")

if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) {
return nil
return errors.New("failed to wait for caches to sync")
}

return pa.syncPortAllocations(stop)
return pa.syncPortAllocations()
}

// Allocate allocates a port. Return ErrPortNotFound if no port is
// allocatable
func (pa *PortAllocator) Allocate() (int32, error) {
// Allocate assigns a port to the GameServer and returns it.
// Return ErrPortNotFound if no port is allocatable
func (pa *PortAllocator) Allocate(gs *v1alpha1.GameServer) (*v1alpha1.GameServer, error) {
if gs.Spec.PortPolicy != v1alpha1.Dynamic {
return gs, errors.Errorf("Port policy of %s is not supported for port allocation", gs.Spec.PortPolicy)
}

pa.mutex.Lock()
defer pa.mutex.Unlock()
for _, n := range pa.portAllocations {
for p, taken := range n {
if !taken {
n[p] = true
return p, nil
pa.gameServerRegistry[gs.ObjectMeta.UID] = true
gs.Spec.HostPort = p
return gs, nil
}
}
}
return -1, ErrPortNotFound
return gs, ErrPortNotFound
}

// DeAllocate marks the given port as no longer allocated
func (pa *PortAllocator) DeAllocate(port int32) {
func (pa *PortAllocator) DeAllocate(gs *v1alpha1.GameServer) {
if gs.Spec.PortPolicy != v1alpha1.Dynamic {
return
}
if gs.Spec.HostPort < pa.minPort || gs.Spec.HostPort > pa.maxPort {
return
}
// skip if it wasn't previously allocated
if _, ok := pa.gameServerRegistry[gs.ObjectMeta.UID]; !ok {
pa.logger.WithField("gs", gs.ObjectMeta.Name).
Info("Did not allocate this GameServer. Ignoring for DeAllocation")
return
}
pa.mutex.Lock()
defer pa.mutex.Unlock()
pa.portAllocations = setPortAllocation(port, pa.portAllocations, false)
pa.portAllocations = setPortAllocation(gs.Spec.HostPort, pa.portAllocations, false)
delete(pa.gameServerRegistry, gs.ObjectMeta.UID)
}

// syncAddNode adds another node port section
// to the available ports
func (pa *PortAllocator) syncAddNode(obj interface{}) {
pa.mutex.Lock()
defer pa.mutex.Unlock()

node := obj.(*corev1.Node)
// if we're already added this node, don't do it again
if _, ok := pa.nodeRegistry[node.ObjectMeta.UID]; ok {
pa.logger.WithField("node", node.ObjectMeta.Name).Info("Already added node to port allocations. Skipping")
return
}

pa.logger.WithField("node", node.ObjectMeta.Name).Info("Adding Node to port allocations")
pa.mutex.Lock()
defer pa.mutex.Unlock()

ports := portAllocation{}
for i := pa.minPort; i <= pa.maxPort; i++ {
ports[i] = false
}

pa.portAllocations = append(pa.portAllocations, ports)
pa.nodeRegistry[node.ObjectMeta.UID] = true
}

// syncDeleteGameServer when a GameServer Pod is deleted
// make the HostPort available
func (pa *PortAllocator) syncDeleteGameServer(object interface{}) {
gs := object.(*v1alpha1.GameServer)
pa.logger.WithField("gs", gs).Info("syncing deleted GameServer")
pa.DeAllocate(gs.Spec.HostPort)
if gs, ok := object.(*v1alpha1.GameServer); ok {
pa.logger.WithField("gs", gs).Info("syncing deleted GameServer")
pa.DeAllocate(gs)
}
}

// syncPortAllocations syncs the pod, node and gameserver caches then
Expand All @@ -176,39 +207,41 @@ func (pa *PortAllocator) syncDeleteGameServer(object interface{}) {
// portAllocations are marked as taken.
// Locks the mutex while doing this.
// This is basically a stop the world Garbage Collection on port allocations.
func (pa *PortAllocator) syncPortAllocations(stop <-chan struct{}) error {
func (pa *PortAllocator) syncPortAllocations() error {
pa.mutex.Lock()
defer pa.mutex.Unlock()

pa.logger.Info("Resetting Port Allocation")

if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) {
return nil
}

nodes, err := pa.nodeLister.List(labels.Everything())
if err != nil {
return errors.Wrap(err, "error listing all nodes")
}

// setup blank port values
nodePorts := pa.nodePortAllocation(nodes)
nodePorts, nodeRegistry := pa.nodePortAllocation(nodes)

gameservers, err := pa.gameServerLister.List(labels.Everything())
if err != nil {
return errors.Wrapf(err, "error listing all GameServers")
}

gsRegistry := map[types.UID]bool{}

// place to put GameServer port allocations that are not ready yet/after the ready state
var nonReadyNodesPorts []int32
// Check GameServers as well, as some
for _, gs := range gameservers {
// if the node doesn't exist, it's likely unscheduled
_, ok := nodePorts[gs.Status.NodeName]
if gs.Status.NodeName != "" && ok {
nodePorts[gs.Status.NodeName][gs.Status.Port] = true
} else if gs.Spec.HostPort != 0 {
nonReadyNodesPorts = append(nonReadyNodesPorts, gs.Spec.HostPort)
if gs.Spec.PortPolicy == v1alpha1.Dynamic {
gsRegistry[gs.ObjectMeta.UID] = true

// if the node doesn't exist, it's likely unscheduled
_, ok := nodePorts[gs.Status.NodeName]
if gs.Status.NodeName != "" && ok {
nodePorts[gs.Status.NodeName][gs.Status.Port] = true
} else if gs.Spec.HostPort != 0 {
nonReadyNodesPorts = append(nonReadyNodesPorts, gs.Spec.HostPort)
}
}
}

Expand All @@ -229,15 +262,21 @@ func (pa *PortAllocator) syncPortAllocations(stop <-chan struct{}) error {
}

pa.portAllocations = allocations
pa.gameServerRegistry = gsRegistry
pa.nodeRegistry = nodeRegistry

return nil
}

// nodePortAllocation returns a map of port allocations all set to being available
// with a map key for each node
func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]portAllocation {
// with a map key for each node, as well as the node registry record (since we're already looping)
func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) (map[string]portAllocation, map[types.UID]bool) {
nodePorts := map[string]portAllocation{}
nodeRegistry := map[types.UID]bool{}

for _, n := range nodes {
nodeRegistry[n.ObjectMeta.UID] = true

// ignore unschedulable nodes
if !n.Spec.Unschedulable {
nodePorts[n.Name] = portAllocation{}
Expand All @@ -246,7 +285,8 @@ func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]por
}
}
}
return nodePorts

return nodePorts, nodeRegistry
}

// setPortAllocation takes a port from an all
Expand Down
Loading

0 comments on commit c2bafcc

Please sign in to comment.