diff --git a/pkg/ifaces/watcher.go b/pkg/ifaces/watcher.go index 12dc12575..9820860f5 100644 --- a/pkg/ifaces/watcher.go +++ b/pkg/ifaces/watcher.go @@ -7,11 +7,13 @@ import ( "path/filepath" "sync" "syscall" + "time" "github.com/fsnotify/fsnotify" "github.com/sirupsen/logrus" "github.com/vishvananda/netlink" "github.com/vishvananda/netns" + "k8s.io/apimachinery/pkg/util/wait" ) const ( @@ -45,13 +47,12 @@ func NewWatcher(bufLen int) *Watcher { func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) { out := make(chan Event, w.bufLen) - nsHandles, err := getNetNSHandles() + netns, err := getNetNS() if err != nil { - go w.sendUpdates(ctx, netns.None(), out) + go w.sendUpdates(ctx, "", out) } else { - for _, nsh := range nsHandles { - nsHandle := nsh - go w.sendUpdates(ctx, nsHandle, out) + for _, n := range netns { + go w.sendUpdates(ctx, n, out) } } // register to get notification when netns is created or deleted and register for link update for new netns @@ -59,13 +60,30 @@ func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) { return out, nil } -func (w *Watcher) sendUpdates(ctx context.Context, netnsHandle netns.NsHandle, out chan Event) { +func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) { + var netnsHandle netns.NsHandle + var err error log := logrus.WithField("component", "ifaces.Watcher") // subscribe for interface events links := make(chan netlink.LinkUpdate) - log.WithField("netns", netnsHandle.String()).Debug("linkSubscribe to receive links update") - if err := w.linkSubscriberAt(netnsHandle, links, ctx.Done()); err != nil { - log.WithError(err).Errorf("can't subscribe to links netns %s", netnsHandle.String()) + if err = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) { + if ns == "" { + netnsHandle = netns.None() + } else { + if netnsHandle, err = netns.GetFromName(ns); err != nil { + return false, nil + } + } + log.WithFields(logrus.Fields{ + "netns": ns, + "netnsHandle": netnsHandle.String(), + }).Debug("linkSubscribe to receive links update") + if err = w.linkSubscriberAt(netnsHandle, links, ctx.Done()); err != nil { + return false, nil + } + return true, nil + }); err != nil { + log.WithError(err).Errorf("can't subscribe to links netns %s netnsHandle %s", ns, netnsHandle.String()) return } @@ -119,7 +137,7 @@ func (w *Watcher) sendUpdates(ctx context.Context, netnsHandle netns.NsHandle, o } } -func getNetNSHandles() ([]netns.NsHandle, error) { +func getNetNS() ([]string, error) { log := logrus.WithField("component", "ifaces.Watcher") files, err := os.ReadDir(netnsVolume) if err != nil { @@ -127,27 +145,20 @@ func getNetNSHandles() ([]netns.NsHandle, error) { return nil, fmt.Errorf("failed to list network-namespaces: %w", err) } - handles := []netns.NsHandle{netns.None()} + netns := []string{""} if len(files) == 0 { log.WithField("netns", files).Debug("empty network-namespaces list") - return handles, nil + return netns, nil } for _, f := range files { ns := f.Name() - handle, err := netns.GetFromName(ns) - if err != nil { - log.WithField("netns", ns).Debug("can't get NsHandle for this netns. Ignoring") - continue - } - handles = append(handles, handle) + netns = append(netns, ns) log.WithFields(logrus.Fields{ - "netns": ns, - "handle": handle.String(), + "netns": ns, }).Debug("Detected network-namespace") - } - return handles, nil + return netns, nil } func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) { @@ -170,12 +181,7 @@ func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) { if event.Op&fsnotify.Create == fsnotify.Create { ns := filepath.Base(event.Name) log.WithField("netns", ns).Debug("netns notification") - handle, err := netns.GetFromName(ns) - if err != nil { - log.WithField("netns", ns).Debug("can't get NsHandle for this netns. Ignoring") - return - } - go w.sendUpdates(ctx, handle, out) + go w.sendUpdates(ctx, ns, out) } case err, ok := <-w.netnsWatcher.Errors: if !ok {