Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

watch for Kubernetes node delete events and reclaim removed peers IP space on delete event #3399

Merged
merged 8 commits into from
Nov 1, 2018
69 changes: 68 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@ package api
import (
"fmt"
"net"
"net/url"
)

// Expose calls the router to assign the given IP addr to the weave bridge.
func (client *Client) Expose(ipAddr *net.IPNet) error {
_, err := client.httpVerb("POST", fmt.Sprintf("/expose/%s", ipAddr), nil)
return err
}

// ReplacePeers replace the current set of peers
func (client *Client) ReplacePeers(peers []string) error {
_, err := client.httpVerb("POST", "/connect", url.Values{"replace": {"true"}, "peer": peers})
return err
}
70 changes: 67 additions & 3 deletions prog/kube-utils/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ package main
import (
"flag"
"fmt"
"math/rand"
"net"
"os"
"os/signal"
"syscall"
"time"

api "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

weaveapi "github.com/weaveworks/weave/api"
"github.com/weaveworks/weave/common"
Expand Down Expand Up @@ -65,7 +71,8 @@ type weaveClient interface {

// For each of those peers that is no longer listed as a node by
// Kubernetes, remove it from Weave IPAM
func reclaimRemovedPeers(kube kubernetes.Interface, weave weaveClient, cml *configMapAnnotations, myPeerName, myNodeName string) error {
func reclaimRemovedPeers(kube kubernetes.Interface, cml *configMapAnnotations, myPeerName, myNodeName string) error {
weave := weaveapi.NewClient(os.Getenv("WEAVE_HTTP_ADDR"), common.Log)
for loopsWhenNothingChanged := 0; loopsWhenNothingChanged < 3; loopsWhenNothingChanged++ {
if err := cml.Init(); err != nil {
return err
Expand Down Expand Up @@ -194,6 +201,52 @@ func reclaimPeer(weave weaveClient, cml *configMapAnnotations, storedPeerList *p
return true, err
}

// resetPeers replaces the peers list with current set of peers
func resetPeers(kube kubernetes.Interface) error {
nodes, err := getKubePeers(kube, false)
if err != nil {
return err
}
peerList := make([]string, 0)
for _, node := range nodes {
peerList = append(peerList, node.addr)
}
weave := weaveapi.NewClient(os.Getenv("WEAVE_HTTP_ADDR"), common.Log)
err = weave.ReplacePeers(peerList)
if err != nil {
return err
}
return nil
}

// regiesters with Kubernetes API server for node delete events. Node delete event handler
// invokes reclaimRemovedPeers to remove it from IPAM so that IP space is reclaimed
func registerForNodeUpdates(client *kubernetes.Clientset, stopCh <-chan struct{}, nodeName, peerName string) {
informerFactory := informers.NewSharedInformerFactory(client, 0)
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
common.Log.Debugln("registering for updates for node delete events")
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
// add random delay to avoid all nodes acting on node delete event at the same
// time leading to contention to use `weave-net` configmap
r := rand.Intn(5000)
time.Sleep(time.Duration(r) * time.Millisecond)

cml := newConfigMapAnnotations(configMapNamespace, configMapName, client)
err := reclaimRemovedPeers(client, cml, peerName, nodeName)
if err != nil {
common.Log.Fatalf("[kube-peers] Error while reclaiming space: %v", err)
}
err = resetPeers(client)
if err != nil {
common.Log.Fatalf("[kube-peers] Error resetting peer list: %v", err)
}
},
})
informerFactory.WaitForCacheSync(stopCh)
informerFactory.Start(stopCh)
}

func main() {
var (
justReclaim bool
Expand All @@ -202,8 +255,10 @@ func main() {
peerName string
nodeName string
logLevel string
runReclaimDaemon bool
)
flag.BoolVar(&justReclaim, "reclaim", false, "reclaim IP space from dead peers")
flag.BoolVar(&runReclaimDaemon, "run-reclaim-daemon", false, "run background process that reclaim IP space from dead peers ")
flag.BoolVar(&justCheck, "check-peer-new", false, "return success if peer name is not stored in annotation")
flag.BoolVar(&justSetNodeStatus, "set-node-status", false, "set NodeNetworkUnavailable to false")
flag.StringVar(&peerName, "peer-name", "unknown", "name of this Weave Net peer")
Expand Down Expand Up @@ -251,8 +306,7 @@ func main() {
}
common.Log.Infoln("[kube-peers] Added myself to peer list", list)

weave := weaveapi.NewClient(os.Getenv("WEAVE_HTTP_ADDR"), common.Log)
err = reclaimRemovedPeers(c, weave, cml, peerName, nodeName)
err = reclaimRemovedPeers(c, cml, peerName, nodeName)
if err != nil {
common.Log.Fatalf("[kube-peers] Error while reclaiming space: %v", err)
}
Expand All @@ -262,4 +316,14 @@ func main() {
for _, node := range peers {
fmt.Println(node.addr)
}

if runReclaimDaemon {
// Handle SIGINT and SIGTERM
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
stopCh := make(chan struct{})
registerForNodeUpdates(c, stopCh, nodeName, peerName)
<-ch
close(stopCh)
}
}
2 changes: 2 additions & 0 deletions prog/weave-kube/launch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ post_start_actions() {

# Mark network as up
/home/weave/kube-utils -set-node-status -node-name="$HOSTNAME"

/home/weave/kube-utils -run-reclaim-daemon -node-name="$HOSTNAME" -peer-name="$PEERNAME" -log-level=debug&
}

post_start_actions &
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ function relaunch_weave_pod {
# Suite
#
function main {
local IPAM_RECOVER_DELAY=15
local IPAM_RECOVER_DELAY=90

start_suite "Test weave-net deallocates from IPAM on node failure";

Expand All @@ -100,8 +100,6 @@ function main {

force_drop_node $HOST2;

relaunch_weave_pod $HOST3;

sleep $IPAM_RECOVER_DELAY;

greyly echo "Checking unreachable IPs"
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading