Skip to content

Commit

Permalink
NETOBSERV-1697: Add retry around netlinkSubscribeAt
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Jun 26, 2024
1 parent fdebe3f commit 116b150
Showing 1 changed file with 42 additions and 28 deletions.
70 changes: 42 additions & 28 deletions pkg/ifaces/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"path/filepath"
"sync"
"syscall"
"time"

"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand Down Expand Up @@ -45,27 +47,51 @@ func NewWatcher(bufLen int) *Watcher {
func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) {
out := make(chan Event, w.bufLen)

nsHandles, err := getNetNSHandles()
netns, err := getNetNS()
if err != nil {
go w.sendUpdates(ctx, netns.None(), out)
go w.sendUpdates(ctx, "", out)
} else {
for _, nsh := range nsHandles {
nsHandle := nsh
go w.sendUpdates(ctx, nsHandle, out)
for _, n := range netns {
go w.sendUpdates(ctx, n, out)
}
}
// register to get notification when netns is created or deleted and register for link update for new netns
w.netnsNotify(ctx, out)
return out, nil
}

func (w *Watcher) sendUpdates(ctx context.Context, netnsHandle netns.NsHandle, out chan Event) {
func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
var netnsHandle netns.NsHandle
var err error
log := logrus.WithField("component", "ifaces.Watcher")
// subscribe for interface events
links := make(chan netlink.LinkUpdate)
log.WithField("netns", netnsHandle.String()).Debug("linkSubscribe to receive links update")
if err := w.linkSubscriberAt(netnsHandle, links, ctx.Done()); err != nil {
log.WithError(err).Errorf("can't subscribe to links netns %s", netnsHandle.String())
doneChan := make(chan struct{})
if err = wait.PollUntilContextTimeout(ctx, 50*time.Microsecond, time.Second, true, func(ctx context.Context) (done bool, err error) {
if ns == "" {
netnsHandle = netns.None()
} else {
if netnsHandle, err = netns.GetFromName(ns); err != nil {
return false, nil
}
}

if err = w.linkSubscriberAt(netnsHandle, links, doneChan); err != nil {
log.WithFields(logrus.Fields{
"netns": ns,
"netnsHandle": netnsHandle.String(),
"error": err,
}).Debug("linkSubscribe failed retry")
return false, nil
}

log.WithFields(logrus.Fields{
"netns": ns,
"netnsHandle": netnsHandle.String(),
}).Debug("linkSubscribe to receive links update")
return true, nil
}); err != nil {
log.WithError(err).Errorf("can't subscribe to links netns %s netnsHandle %s", ns, netnsHandle.String())
return
}

Expand Down Expand Up @@ -119,35 +145,28 @@ func (w *Watcher) sendUpdates(ctx context.Context, netnsHandle netns.NsHandle, o
}
}

func getNetNSHandles() ([]netns.NsHandle, error) {
func getNetNS() ([]string, error) {
log := logrus.WithField("component", "ifaces.Watcher")
files, err := os.ReadDir(netnsVolume)
if err != nil {
log.Warningf("can't detect any network-namespaces err: %v [Ignore if the agent privileged flag is not set]", err)
return nil, fmt.Errorf("failed to list network-namespaces: %w", err)
}

handles := []netns.NsHandle{netns.None()}
netns := []string{""}
if len(files) == 0 {
log.WithField("netns", files).Debug("empty network-namespaces list")
return handles, nil
return netns, nil
}
for _, f := range files {
ns := f.Name()
handle, err := netns.GetFromName(ns)
if err != nil {
log.WithField("netns", ns).Debug("can't get NsHandle for this netns. Ignoring")
continue
}
handles = append(handles, handle)
netns = append(netns, ns)
log.WithFields(logrus.Fields{
"netns": ns,
"handle": handle.String(),
"netns": ns,
}).Debug("Detected network-namespace")

}

return handles, nil
return netns, nil
}

func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
Expand All @@ -170,12 +189,7 @@ func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
if event.Op&fsnotify.Create == fsnotify.Create {
ns := filepath.Base(event.Name)
log.WithField("netns", ns).Debug("netns notification")
handle, err := netns.GetFromName(ns)
if err != nil {
log.WithField("netns", ns).Debug("can't get NsHandle for this netns. Ignoring")
return
}
go w.sendUpdates(ctx, handle, out)
go w.sendUpdates(ctx, ns, out)
}
case err, ok := <-w.netnsWatcher.Errors:
if !ok {
Expand Down

0 comments on commit 116b150

Please sign in to comment.