Skip to content

Commit

Permalink
NETOBSERV-1697: Add retry around netlinkSubscribeAt
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Jun 28, 2024
1 parent fdebe3f commit 3f6b9c8
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 64 deletions.
10 changes: 10 additions & 0 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
func (m *FlowFetcher) AttachTCX(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
if iface.NetNS != netns.None() {
originalNs, err := netns.Get()
if err != nil {
return fmt.Errorf("failed to get current netns: %w", err)
}
defer func() {
if err := netns.Set(originalNs); err != nil {
ilog.WithError(err).Error("failed to set netns back")
}
originalNs.Close()
}()
if err := unix.Setns(int(iface.NetNS), unix.CLONE_NEWNET); err != nil {
return fmt.Errorf("failed to setns to %s: %w", iface.NetNS, err)
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/ifaces/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package ifaces
import (
"context"
"fmt"
"net"

"github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
)

Expand Down Expand Up @@ -49,14 +48,22 @@ type Informer interface {
Subscribe(ctx context.Context) (<-chan Event, error)
}

func netInterfaces() ([]Interface, error) {
ifs, err := net.Interfaces()
func netInterfaces(nsh netns.NsHandle) ([]Interface, error) {
handle, err := netlink.NewHandleAt(nsh)
if err != nil {
return nil, fmt.Errorf("can't fetch interfaces: %w", err)
return nil, fmt.Errorf("failed to create handle for netns (%s): %w", nsh.String(), err)
}
names := make([]Interface, len(ifs))
for i, ifc := range ifs {
names[i] = Interface{Name: ifc.Name, Index: ifc.Index, NetNS: netns.None()}
defer handle.Delete()

// Get a list of interfaces in the namespace
links, err := handle.LinkList()
if err != nil {
return nil, fmt.Errorf("failed to list interfaces in netns (%s): %w", nsh.String(), err)
}

names := make([]Interface, len(links))
for i, link := range links {
names[i] = Interface{Name: link.Attrs().Name, Index: link.Attrs().Index, NetNS: nsh}
}
return names, nil
}
67 changes: 45 additions & 22 deletions pkg/ifaces/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"time"

"github.com/sirupsen/logrus"
"github.com/vishvananda/netns"
)

// Poller periodically looks for the network interfaces in the system and forwards Event
// notifications when interfaces are added or deleted.
type Poller struct {
period time.Duration
current map[Interface]struct{}
interfaces func() ([]Interface, error)
interfaces func(handle netns.NsHandle) ([]Interface, error)
bufLen int
}

Expand All @@ -26,32 +27,54 @@ func NewPoller(period time.Duration, bufLen int) *Poller {
}

func (np *Poller) Subscribe(ctx context.Context) (<-chan Event, error) {
log := logrus.WithField("component", "ifaces.Poller")
log.WithField("period", np.period).Debug("subscribing to Interface events")

out := make(chan Event, np.bufLen)
go func() {
ticker := time.NewTicker(np.period)
defer ticker.Stop()
for {
if ifaces, err := np.interfaces(); err != nil {
log.WithError(err).Warn("fetching interface names")
} else {
log.WithField("names", ifaces).Debug("fetched interface names")
np.diffNames(out, ifaces)
}
select {
case <-ctx.Done():
log.Debug("stopped")
close(out)
return
case <-ticker.C:
// continue after period
}
netns, err := getNetNS()
if err != nil {
go np.pollForEvents(ctx, "", out)
} else {
for _, n := range netns {
go np.pollForEvents(ctx, n, out)
}
}()
}
return out, nil
}

func (np *Poller) pollForEvents(ctx context.Context, ns string, out chan Event) {
log := logrus.WithField("component", "ifaces.Poller")
log.WithField("period", np.period).Debug("subscribing to Interface events")
ticker := time.NewTicker(np.period)
var netnsHandle netns.NsHandle
var err error

if ns == "" {
netnsHandle = netns.None()
} else {
netnsHandle, err = netns.GetFromName(ns)
if err != nil {
return
}
}

defer ticker.Stop()
for {
if ifaces, err := np.interfaces(netnsHandle); err != nil {
log.WithError(err).Warn("fetching interface names")
} else {
log.WithField("names", ifaces).Debug("fetched interface names")
np.diffNames(out, ifaces)
}
select {
case <-ctx.Done():
log.Debug("stopped")
close(out)
return
case <-ticker.C:
// continue after a period
}
}
}

// diffNames compares and updates the internal account of interfaces with the latest list of
// polled interfaces. It forwards Events for any detected addition or removal of interfaces.
func (np *Poller) diffNames(events chan Event, ifaces []Interface) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ifaces/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestPoller(t *testing.T) {
// fake net.Interfaces implementation that returns two different sets of
// interfaces on successive invocations
firstInvocation := true
var fakeInterfaces = func() ([]Interface, error) {
var fakeInterfaces = func(handle netns.NsHandle) ([]Interface, error) {
if firstInvocation {
firstInvocation = false
return []Interface{{"foo", 1, netns.None()}, {"bar", 2, netns.None()}}, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ifaces/registerer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestRegisterer(t *testing.T) {
watcher := NewWatcher(10)
registry := NewRegisterer(watcher, 10)
// mock net.Interfaces and linkSubscriber to control which interfaces are discovered
watcher.interfaces = func() ([]Interface, error) {
watcher.interfaces = func(handle netns.NsHandle) ([]Interface, error) {
return []Interface{{"foo", 1, netns.None()}, {"bar", 2, netns.None()}, {"baz", 3, netns.None()}}, nil
}
inputLinks := make(chan netlink.LinkUpdate, 10)
Expand Down
76 changes: 45 additions & 31 deletions pkg/ifaces/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"path/filepath"
"sync"
"syscall"
"time"

"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand All @@ -23,7 +25,7 @@ const (
type Watcher struct {
bufLen int
current map[Interface]struct{}
interfaces func() ([]Interface, error)
interfaces func(handle netns.NsHandle) ([]Interface, error)
// linkSubscriber abstracts netlink.LinkSubscribe implementation, allowing the injection of
// mocks for unit testing
linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error
Expand All @@ -45,34 +47,58 @@ func NewWatcher(bufLen int) *Watcher {
func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) {
out := make(chan Event, w.bufLen)

nsHandles, err := getNetNSHandles()
netns, err := getNetNS()
if err != nil {
go w.sendUpdates(ctx, netns.None(), out)
go w.sendUpdates(ctx, "", out)
} else {
for _, nsh := range nsHandles {
nsHandle := nsh
go w.sendUpdates(ctx, nsHandle, out)
for _, n := range netns {
go w.sendUpdates(ctx, n, out)
}
}
// register to get notification when netns is created or deleted and register for link update for new netns
w.netnsNotify(ctx, out)
return out, nil
}

func (w *Watcher) sendUpdates(ctx context.Context, netnsHandle netns.NsHandle, out chan Event) {
func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
var netnsHandle netns.NsHandle
var err error
log := logrus.WithField("component", "ifaces.Watcher")
// subscribe for interface events
links := make(chan netlink.LinkUpdate)
log.WithField("netns", netnsHandle.String()).Debug("linkSubscribe to receive links update")
if err := w.linkSubscriberAt(netnsHandle, links, ctx.Done()); err != nil {
log.WithError(err).Errorf("can't subscribe to links netns %s", netnsHandle.String())
doneChan := make(chan struct{})
if err = wait.PollUntilContextTimeout(ctx, 50*time.Microsecond, time.Second, true, func(ctx context.Context) (done bool, err error) {
if ns == "" {
netnsHandle = netns.None()
} else {
if netnsHandle, err = netns.GetFromName(ns); err != nil {
return false, nil
}
}

if err = w.linkSubscriberAt(netnsHandle, links, doneChan); err != nil {
log.WithFields(logrus.Fields{
"netns": ns,
"netnsHandle": netnsHandle.String(),
"error": err,
}).Debug("linkSubscribe failed retry")
return false, nil
}

log.WithFields(logrus.Fields{
"netns": ns,
"netnsHandle": netnsHandle.String(),
}).Debug("linkSubscribe to receive links update")
return true, nil
}); err != nil {
log.WithError(err).Errorf("can't subscribe to links netns %s netnsHandle %s", ns, netnsHandle.String())
return
}

// before sending netlink updates, send all the existing interfaces at the moment of starting
// the Watcher
if netnsHandle.Equal(netns.None()) {
if names, err := w.interfaces(); err != nil {
if netnsHandle.IsOpen() || netnsHandle.Equal(netns.None()) {
if names, err := w.interfaces(netnsHandle); err != nil {
log.WithError(err).Error("can't fetch network interfaces. You might be missing flows")
} else {
for _, name := range names {
Expand Down Expand Up @@ -119,35 +145,28 @@ func (w *Watcher) sendUpdates(ctx context.Context, netnsHandle netns.NsHandle, o
}
}

func getNetNSHandles() ([]netns.NsHandle, error) {
func getNetNS() ([]string, error) {
log := logrus.WithField("component", "ifaces.Watcher")
files, err := os.ReadDir(netnsVolume)
if err != nil {
log.Warningf("can't detect any network-namespaces err: %v [Ignore if the agent privileged flag is not set]", err)
return nil, fmt.Errorf("failed to list network-namespaces: %w", err)
}

handles := []netns.NsHandle{netns.None()}
netns := []string{""}
if len(files) == 0 {
log.WithField("netns", files).Debug("empty network-namespaces list")
return handles, nil
return netns, nil
}
for _, f := range files {
ns := f.Name()
handle, err := netns.GetFromName(ns)
if err != nil {
log.WithField("netns", ns).Debug("can't get NsHandle for this netns. Ignoring")
continue
}
handles = append(handles, handle)
netns = append(netns, ns)
log.WithFields(logrus.Fields{
"netns": ns,
"handle": handle.String(),
"netns": ns,
}).Debug("Detected network-namespace")

}

return handles, nil
return netns, nil
}

func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
Expand All @@ -170,12 +189,7 @@ func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
if event.Op&fsnotify.Create == fsnotify.Create {
ns := filepath.Base(event.Name)
log.WithField("netns", ns).Debug("netns notification")
handle, err := netns.GetFromName(ns)
if err != nil {
log.WithField("netns", ns).Debug("can't get NsHandle for this netns. Ignoring")
return
}
go w.sendUpdates(ctx, handle, out)
go w.sendUpdates(ctx, ns, out)
}
case err, ok := <-w.netnsWatcher.Errors:
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ifaces/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestWatcher(t *testing.T) {

watcher := NewWatcher(10)
// mock net.Interfaces and linkSubscriber to control which interfaces are discovered
watcher.interfaces = func() ([]Interface, error) {
watcher.interfaces = func(handle netns.NsHandle) ([]Interface, error) {
return []Interface{{"foo", 1, netns.None()}, {"bar", 2, netns.None()}, {"baz", 3, netns.None()}}, nil
}
inputLinks := make(chan netlink.LinkUpdate, 10)
Expand Down

0 comments on commit 3f6b9c8

Please sign in to comment.