Skip to content

Commit

Permalink
[Multicast] Support removal of staled outbound multicast routes
Browse files Browse the repository at this point in the history
Signed-off-by: ceclinux <[email protected]>
  • Loading branch information
ceclinux committed Mar 4, 2022
1 parent 871806d commit 5003af1
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 1 deletion.
33 changes: 33 additions & 0 deletions pkg/agent/multicast/mcast_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"net"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
Expand All @@ -38,6 +39,7 @@ func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, mul
nodeConfig: nodeconfig,
groupCache: groupCache,
inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc}),
outboundRouteCache: cache.NewIndexer(getMulticastOutboundEntryKey, cache.Indexers{}),
multicastInterfaces: multicastInterfaces.List(),
socket: multicastSocket,
}
Expand Down Expand Up @@ -72,6 +74,7 @@ type MRouteClient struct {
nodeConfig *config.NodeConfig
multicastInterfaces []string
inboundRouteCache cache.Indexer
outboundRouteCache cache.Indexer
groupCache cache.Indexer
socket RouteInterface
multicastInterfaceConfigs []multicastInterfaceConfig
Expand Down Expand Up @@ -165,6 +168,12 @@ func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err err
if err != nil {
return err
}
routeEntry := &outboundMulticastRouteEntry{
group: group.String(),
src: src.String(),
createdTime: time.Now(),
}
c.outboundRouteCache.Add(routeEntry)
return nil
}

Expand Down Expand Up @@ -200,11 +209,33 @@ type inboundMulticastRouteEntry struct {
vif uint16
}

// outboundMulticastRouteEntry encodes the outbound multicast routing entry.
// For example,
// type inboundMulticastRouteEntry struct {
// group "226.94.9.9"
// src "10.0.0.55"
// } encodes the multicast route entry from Antrea gateway to multicast interfaces
// (10.0.0.55,226.94.9.9) Iif: antrea-gw0 Oifs: list of multicastInterfaces.
// The iif is always Antrea gateway and oifs are always outbound interfaces
// so we do not put them in the struct.
// Field pktCount and createdTime are used for removing staled multicast routes.
type outboundMulticastRouteEntry struct {
group string
src string
pktCount uint32
createdTime time.Time
}

func getMulticastInboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*inboundMulticastRouteEntry)
return entry.group + "/" + entry.src + "/" + fmt.Sprint(entry.vif), nil
}

func getMulticastOutboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*outboundMulticastRouteEntry)
return entry.group + "/" + entry.src, nil
}

func inboundGroupIndexFunc(obj interface{}) ([]string, error) {
entry, ok := obj.(*inboundMulticastRouteEntry)
if !ok {
Expand Down Expand Up @@ -272,6 +303,8 @@ type RouteInterface interface {
// AddMrouteEntry adds multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif) and outbound multicast interfaces(oifs).
AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) (err error)
// GetoutboundMroutePacketCount returns number of routed by outboundRoute entry.
GetoutboundMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error)
// DelMrouteEntry deletes multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif).
DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error)
Expand Down
42 changes: 42 additions & 0 deletions pkg/agent/multicast/mcast_route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ import (
"fmt"
"net"
"syscall"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/util/runtime"
)

const (
outboundMRouteTimeout = time.Minute * 10
)

// parseIGMPMsg parses the kernel version into parsedIGMPMsg. Note we need to consider the change
// after linux 5.9 in the igmpmsg struct when parsing vif. Please check
// https://github.com/torvalds/linux/commit/c8715a8e9f38906e73d6d78764216742db13ba0e.
Expand Down Expand Up @@ -71,10 +77,46 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) {
}
}()

go wait.NonSlidingUntil(c.updateOutboundMrouteStats, outboundMRouteTimeout, stopCh)

for i := 0; i < int(workerCount); i++ {
go c.worker(stopCh)
}
<-stopCh
c.socket.FlushMRoute()
syscall.Close(c.socket.GetFD())
}

func (c *MRouteClient) updateOutboundMrouteStats() {
klog.V(2).InfoS("Updating outbound multicast route statistics and removing staled routes")
deletedOutboundRoutes := make([]*outboundMulticastRouteEntry, 0)
for _, obj := range c.outboundRouteCache.List() {
outboundRoute, _ := obj.(*outboundMulticastRouteEntry)
packetCount, err := c.socket.GetoutboundMroutePacketCount(net.ParseIP(outboundRoute.src), net.ParseIP(outboundRoute.group))
if err != nil {
klog.ErrorS(err, "Failed to getpacket count for outbound multicast route", "outboundRoute", outboundRoute)
return
}
packetCountDiff := packetCount - outboundRoute.pktCount
klog.V(4).Infof("Outbound multicast route %v routes %d packets in last %s", outboundRoute, packetCountDiff, outboundMRouteTimeout.String())
if packetCountDiff == uint32(0) {
deletedOutboundRoutes = append(deletedOutboundRoutes, outboundRoute)
} else {
outboundRoute.pktCount = packetCount
c.outboundRouteCache.Update(outboundRoute)
}
}
now := time.Now()
for _, outboundRoute := range deletedOutboundRoutes {
klog.Infof("Deleting staled outbound multicast route", "group", outboundRoute.group, "source", outboundRoute.src)
if now.Sub(outboundRoute.createdTime) < outboundMRouteTimeout {
continue
}
err := c.socket.DelMrouteEntry(net.ParseIP(outboundRoute.src), net.ParseIP(outboundRoute.group), c.internalInterfaceVIF)
if err != nil {
klog.ErrorS(err, "Failed to delete outbound multicast route", "group", outboundRoute.group, "source", outboundRoute.src)
return
}
c.outboundRouteCache.Delete(outboundRoute)
}
}
12 changes: 12 additions & 0 deletions pkg/agent/multicast/mcast_socket_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs []
return multicastsyscall.SetsockoptMfcctl(s.GetFD(), syscall.IPPROTO_IP, multicastsyscall.MRT_ADD_MFC, mc)
}

func (s *Socket) GetoutboundMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error) {
siocSgReq := multicastsyscall.SiocSgReq{
Src: [4]byte{src[0], src[1], src[2], src[3]},
Grp: [4]byte{group[0], group[1], group[2], group[3]},
}
stats, err := multicastsyscall.IoctlGetSiocSgReq(s.GetFD(), &siocSgReq)
if err != nil {
return 0, err
}
return stats.Pktcnt, nil
}

func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) {
mc := &multicastsyscall.Mfcctl{}
origin := src.To4()
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/multicast/mcast_socket_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs []
return nil
}

func (s *Socket) GetoutboundMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error) {
return 0, nil
}

func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) {
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/agent/multicast/testing/mock_multicast.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/agent/util/syscall/linux/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (

type Mfcctl C.struct_mfcctl
type Vifctl C.struct_vifctl_with_ifindex
type SiocSgReq C.struct_siocsgreq

const SizeofMfcctl = C.sizeof_struct_mfcctl
const SizeofVifctl = C.sizeof_struct_vifctl_with_ifindex
Expand Down
17 changes: 16 additions & 1 deletion pkg/agent/util/syscall/syscall_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package syscall

import (
"runtime"
"syscall"
"unsafe"
)
Expand All @@ -34,7 +35,15 @@ func setsockopt(s int, level int, name int, val unsafe.Pointer, vallen uintptr)
return
}

// Please add your wrapped syscall functions below
func ioctl(fd int, req uint, arg uintptr) (err error) {
_, _, e1 := syscall.Syscall(syscall.SYS_IOCTL, uintptr(fd), uintptr(req), uintptr(arg))
if e1 != 0 {
return e1
}
return
}

// Please add your wrapped syscall functions below.

func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error {
return setsockopt(fd, level, opt, unsafe.Pointer(mfcctl), SizeofMfcctl)
Expand All @@ -43,3 +52,9 @@ func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error {
func SetsockoptVifctl(fd, level, opt int, vifctl *Vifctl) error {
return setsockopt(fd, level, opt, unsafe.Pointer(vifctl), SizeofVifctl)
}

func IoctlGetSiocSgReq(fd int, siocsgreq *SiocSgReq) (*SiocSgReq, error) {
err := ioctl(fd, SIOCGETSGCNT, uintptr(unsafe.Pointer(siocsgreq)))
runtime.KeepAlive(siocsgreq)
return siocsgreq, err
}
9 changes: 9 additions & 0 deletions pkg/agent/util/syscall/ztypes_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
MRT_INIT = 0xc8
MRT_FLUSH = 0xd4
MAXVIFS = 0x20
SIOCGETSGCNT = 0x89e1
)

type Mfcctl struct {
Expand All @@ -48,6 +49,14 @@ type Vifctl struct {
Rmt_addr [4]byte /* in_addr */
}

type SiocSgReq = struct {
Src [4]byte /* in_addr */
Grp [4]byte /* in_addr */
Pktcnt uint32
Bytecnt uint32
If uint32
}

const SizeofMfcctl = 0x3c
const SizeofVifctl = 0x10
const SizeofIgmpmsg = 0x14

0 comments on commit 5003af1

Please sign in to comment.