diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 66247216af2..ed3f4dbd93c 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -46,7 +46,7 @@ featureGates: # IPAM when configuring secondary network interfaces with Multus. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "AntreaIPAM" "default" false) }} -# Enable multicast traffic. This feature is supported only with noEncap mode. +# Enable multicast traffic. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "Multicast" "default" false) }} # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. diff --git a/build/charts/antrea/conf/antrea-controller.conf b/build/charts/antrea/conf/antrea-controller.conf index 440952a8757..745e8df9496 100644 --- a/build/charts/antrea/conf/antrea-controller.conf +++ b/build/charts/antrea/conf/antrea-controller.conf @@ -17,7 +17,7 @@ featureGates: # Enable collecting and exposing NetworkPolicy statistics. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "NetworkPolicyStats" "default" true) }} -# Enable multicast traffic. This feature is supported only with noEncap mode. +# Enable multicast traffic. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "Multicast" "default" false) }} # Enable controlling SNAT IPs of Pod egress traffic. diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index ffdbe7d4304..03e3874d5a4 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-agent @@ -3933,7 +3933,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 7c64611d3a5..5c9fd89a175 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-agent @@ -3935,7 +3935,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 3a5dbc75ac4..7822ecf0c82 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c74fa3f40177249ad901af12a4127b31b3291f9b8bf3ce6a9be1e666e29c5447 + checksum/config: 741b313c6ab0ed98e7d994985861722f503a93529f90a5141b8a6e0c124d8904 labels: app: antrea component: antrea-agent @@ -3932,7 +3932,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c74fa3f40177249ad901af12a4127b31b3291f9b8bf3ce6a9be1e666e29c5447 + checksum/config: 741b313c6ab0ed98e7d994985861722f503a93529f90a5141b8a6e0c124d8904 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 06a718aec3c..04fd7cbf74a 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2596,7 +2596,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2885,7 +2885,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3705,7 +3705,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 1609abc57e2865390df7a7d99e4c3b342c7e097fa879fefe8e4315130eaa9019 + checksum/config: c74f29ceba3905db50cef22ee46f73e1c101c108a70e70918b17413c174081e8 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -3991,7 +3991,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 1609abc57e2865390df7a7d99e4c3b342c7e097fa879fefe8e4315130eaa9019 + checksum/config: c74f29ceba3905db50cef22ee46f73e1c101c108a70e70918b17413c174081e8 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 69e328f1dd2..ba216d1e056 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0814cc9f3baa94e76e83a108b04d05200485610c7f5950c584503af7151a9e86 + checksum/config: 056a828ba2400e94aa9c43e6e74a4b007027bf6b95a68e1e15f34cd6ffeb2baa labels: app: antrea component: antrea-agent @@ -3932,7 +3932,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0814cc9f3baa94e76e83a108b04d05200485610c7f5950c584503af7151a9e86 + checksum/config: 056a828ba2400e94aa9c43e6e74a4b007027bf6b95a68e1e15f34cd6ffeb2baa labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 901d48cae7f..78e715e6070 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -652,7 +652,9 @@ func run(o *Options) error { ovsBridgeClient, podUpdateChannel, o.igmpQueryInterval, - validator) + validator, + networkConfig.TrafficEncapMode.SupportsEncap(), + informerFactory) if err := mcastController.Initialize(); err != nil { return err } diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 61105539235..03f98df73e4 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -501,8 +501,9 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error { if err != nil { return fmt.Errorf("error when retrieving MAC of Node %s: %v", nodeName, err) } - peerNodeIPs, err := c.getNodeTransportAddrs(node) + peerNodeIPs, err := k8s.GetNodeTransportAddrs(node) if err != nil { + klog.ErrorS(err, "Failed to retrieve Node IP addresses", "node", node.Name) return err } peerWireGuardPublicKey := node.Annotations[types.NodeWireGuardPublicAnnotationKey] @@ -799,23 +800,3 @@ func getNodeMAC(node *corev1.Node) (net.HardwareAddr, error) { } return mac, nil } - -func (c *Controller) getNodeTransportAddrs(node *corev1.Node) (*utilip.DualStackIPs, error) { - if c.networkConfig.TransportIface != "" || len(c.networkConfig.TransportIfaceCIDRs) > 0 { - transportAddrs, err := k8s.GetNodeAddrsFromAnnotations(node, types.NodeTransportAddressAnnotationKey) - if err != nil { - return nil, err - } - if transportAddrs != nil { - return transportAddrs, nil - } - klog.InfoS("Transport address is not found, using NodeIP instead", "node", node.Name) - } - // Use NodeIP if the transport IP address is not set or not found. - peerNodeIPs, err := k8s.GetNodeAddrs(node) - if err != nil { - klog.ErrorS(err, "Failed to retrieve Node IP addresses", "node", node.Name) - return nil, err - } - return peerNodeIPs, nil -} diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index 12a14bbcb16..f557041a257 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -19,9 +19,14 @@ import ( "sync" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -35,6 +40,7 @@ import ( binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/k8s" ) type eventType uint8 @@ -48,6 +54,13 @@ const ( // How long to wait before retrying the processing of a multicast group change. minRetryDelay = 5 * time.Second maxRetryDelay = 300 * time.Second + + // Interval of reprocessing every node. + nodeResyncPeriod = 60 * time.Second + + // nodeUpdateKey is a key to trigger the Node list operation and update the OpenFlow group buckets to report + // the local multicast groups to other Nodes. + nodeUpdateKey = "nodeUpdate" ) var ( @@ -61,13 +74,18 @@ type mcastGroupEvent struct { eType eventType time time.Time iface *interfacestore.InterfaceConfig + // srcNode is the Node IP where the IGMP report message is sent from. It is set only with encap mode. + srcNode net.IP } type GroupMemberStatus struct { group net.IP // localMembers is a map for the local Pod member and its last update time, key is the Pod's interface name, // and value is its last update time. - localMembers map[string]time.Time + localMembers map[string]time.Time + // remoteMembers is a set for Nodes which have joined the multicast group in the cluster. The Node's IP is + // added in the set. + remoteMembers sets.String lastIGMPReport time.Time ofGroupID binding.GroupIDType } @@ -91,11 +109,12 @@ func (c *Controller) eventHandler(stopCh <-chan struct{}) { // addGroupMemberStatus adds the new group into groupCache. func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) { status := &GroupMemberStatus{ - group: e.group, - lastIGMPReport: e.time, - localMembers: map[string]time.Time{e.iface.InterfaceName: e.time}, - ofGroupID: c.v4GroupAllocator.Allocate(), + group: e.group, + ofGroupID: c.v4GroupAllocator.Allocate(), + remoteMembers: sets.NewString(), + localMembers: make(map[string]time.Time), } + status = addGroupMember(status, e) c.groupCache.Add(status) c.queue.Add(e.group.String()) klog.InfoS("Added new multicast group to cache", "group", e.group, "interface", e.iface.InterfaceName) @@ -112,17 +131,17 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent newStatus := &GroupMemberStatus{ group: status.group, localMembers: make(map[string]time.Time), + remoteMembers: status.remoteMembers, lastIGMPReport: status.lastIGMPReport, ofGroupID: status.ofGroupID, } for m, t := range status.localMembers { newStatus.localMembers[m] = t } - _, exist := status.localMembers[e.iface.InterfaceName] + exist := memberExists(status, e) switch e.eType { case groupJoin: - newStatus.lastIGMPReport = e.time - newStatus.localMembers[e.iface.InterfaceName] = e.time + newStatus = addGroupMember(newStatus, e) c.groupCache.Update(newStatus) if !exist { klog.InfoS("Added member to multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) @@ -130,18 +149,21 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent } case groupLeave: if exist { - delete(newStatus.localMembers, e.iface.InterfaceName) + newStatus = deleteGroupMember(newStatus, e) c.groupCache.Update(newStatus) - klog.InfoS("Deleted member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) - _, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName) - // Notify worker immediately about the member leave event if the member doesn't exist on the Node, or there are - // other local members in the multicast group. - if !found || len(newStatus.localMembers) > 0 { - c.queue.Add(newStatus.group.String()) + if e.iface.Type == interfacestore.ContainerInterface { + _, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName) + // Notify worker immediately about the member leave event if the member doesn't exist on the Node, or there are + // other local members in the multicast group. + if !found || len(newStatus.localMembers) > 0 { + c.queue.Add(newStatus.group.String()) + } else { + // Check if all local members have left the multicast group. + klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + c.checkLastMember(e.group) + } } else { - // Check if all local members have left the multicast group. - klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) - c.checkLastMember(e.group) + c.queue.Add(newStatus.group.String()) } } } @@ -175,6 +197,7 @@ func (c *Controller) clearStaleGroups() { if now.Sub(lastUpdate) > c.mcastGroupTimeout { ifConfig := &interfacestore.InterfaceConfig{ InterfaceName: member, + Type: interfacestore.ContainerInterface, } event := &mcastGroupEvent{ group: status.group, @@ -200,6 +223,7 @@ func (c *Controller) removeLocalInterface(podEvent types.PodUpdate) { interfaceName := util.GenerateContainerInterfaceName(podEvent.PodName, podEvent.PodNamespace, podEvent.ContainerID) ifConfig := &interfacestore.InterfaceConfig{ InterfaceName: interfaceName, + Type: interfacestore.ContainerInterface, } groupStatuses := c.getGroupMemberStatusesByPod(interfaceName) for _, g := range groupStatuses { @@ -222,17 +246,31 @@ type Controller struct { groupEventCh chan *mcastGroupEvent groupCache cache.Indexer queue workqueue.RateLimitingInterface - // installedGroups saves the groups which are configured on both OVS and the host. + nodeInformer coreinformers.NodeInformer + nodeLister corelisters.NodeLister + nodeListerSynced cache.InformerSynced + nodeUpdateQueue workqueue.RateLimitingInterface + // installedGroups saves the groups which are configured on OVS. + // With encap mode, the entries in installedGroups include all multicast groups identified in the cluster. installedGroups sets.String installedGroupsMutex sync.RWMutex - mRouteClient *MRouteClient - ovsBridgeClient ovsconfig.OVSBridgeClient + // installedLocalGroups saves the groups which are configured on OVS and host. The entries in installedLocalGroups + // include the multicast groups that local Pod members join. + installedLocalGroups sets.String + installedLocalGroupsMutex sync.RWMutex + mRouteClient *MRouteClient + ovsBridgeClient ovsconfig.OVSBridgeClient // queryInterval is the interval to send IGMP query messages. queryInterval time.Duration // mcastGroupTimeout is the timeout to detect a group as stale if no IGMP report is received within the time. mcastGroupTimeout time.Duration // the group ID in OVS for group which IGMP queries are sent to queryGroupId binding.GroupIDType + // nodeGroupID is the OpenFlow group ID in OVS which is used to send IGMP report messages to other Nodes. + nodeGroupID binding.GroupIDType + // installedNodes is the installed Node set that the IGMP report message is sent to. + installedNodes sets.String + encapEnabled bool } func NewMulticastController(ofClient openflow.Client, @@ -244,28 +282,54 @@ func NewMulticastController(ofClient openflow.Client, ovsBridgeClient ovsconfig.OVSBridgeClient, podUpdateSubscriber channel.Subscriber, igmpQueryInterval time.Duration, - validator types.McastNetworkPolicyController) *Controller { + validator types.McastNetworkPolicyController, + isEncap bool, + informerFactory informers.SharedInformerFactory) *Controller { eventCh := make(chan *mcastGroupEvent, workerCount) - groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, validator) + groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, validator, isEncap) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) - multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces) + multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces, isEncap) c := &Controller{ - ofClient: ofClient, - ifaceStore: ifaceStore, - v4GroupAllocator: v4GroupAllocator, - nodeConfig: nodeConfig, - igmpSnooper: groupSnooper, - groupEventCh: eventCh, - groupCache: groupCache, - installedGroups: sets.NewString(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), - mRouteClient: multicastRouteClient, - ovsBridgeClient: ovsBridgeClient, - queryInterval: igmpQueryInterval, - mcastGroupTimeout: igmpQueryInterval * 3, - queryGroupId: v4GroupAllocator.Allocate(), + ofClient: ofClient, + ifaceStore: ifaceStore, + v4GroupAllocator: v4GroupAllocator, + nodeConfig: nodeConfig, + igmpSnooper: groupSnooper, + groupEventCh: eventCh, + groupCache: groupCache, + installedGroups: sets.NewString(), + installedLocalGroups: sets.NewString(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), + mRouteClient: multicastRouteClient, + ovsBridgeClient: ovsBridgeClient, + queryInterval: igmpQueryInterval, + mcastGroupTimeout: igmpQueryInterval * 3, + queryGroupId: v4GroupAllocator.Allocate(), + encapEnabled: isEncap, + } + if isEncap { + c.nodeGroupID = v4GroupAllocator.Allocate() + c.installedNodes = sets.NewString() + c.nodeInformer = informerFactory.Core().V1().Nodes() + c.nodeLister = c.nodeInformer.Lister() + c.nodeListerSynced = c.nodeInformer.Informer().HasSynced + c.nodeUpdateQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "nodeUpdate") + c.nodeInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(cur interface{}) { + c.nodeUpdateQueue.Add(nodeUpdateKey) + }, + UpdateFunc: func(old, cur interface{}) { + c.checkNodeUpdate(old, cur) + }, + DeleteFunc: func(old interface{}) { + c.nodeUpdateQueue.Add(nodeUpdateKey) + }, + }, + nodeResyncPeriod, + ) } podUpdateSubscriber.Subscribe(c.memberChanged) return c @@ -290,6 +354,16 @@ func (c *Controller) Initialize() error { if err != nil { return err } + if c.encapEnabled { + // Install OpenFlow group to send the multicast groups that local Pods joined to all other Nodes in the cluster. + if err := c.ofClient.InstallMulticastGroup(c.nodeGroupID, nil, nil); err != nil { + klog.ErrorS(err, "Failed to update OpenFlow group for remote Nodes") + } + if err := c.ofClient.InstallMulticastRemoteReportFlows(c.nodeGroupID); err != nil { + klog.ErrorS(err, "Failed to install OpenFlow group and flow to send IGMP report to other Nodes") + return err + } + } return nil } @@ -301,6 +375,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } }, c.queryInterval, stopCh) + if c.encapEnabled { + go wait.NonSlidingUntil(c.syncLocalGroupsToOtherNodes, c.queryInterval, stopCh) + go wait.Until(c.nodeWorker, time.Second, stopCh) + } + // Periodically check the group member status, and remove the groups in which no members exist go wait.NonSlidingUntil(c.clearStaleGroups, c.queryInterval, stopCh) go c.eventHandler(stopCh) @@ -380,54 +459,107 @@ func (c *Controller) syncGroup(groupKey string) error { } memberPorts = append(memberPorts, uint32(obj.OFPort)) } - if c.groupHasInstalled(groupKey) { - if c.groupIsStale(status) { - // Remove the multicast flow entry if no local Pod is in the group. - if err := c.ofClient.UninstallMulticastFlows(status.group); err != nil { - klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) + var remoteNodeReceivers []net.IP + if c.encapEnabled { + remoteNodeReceivers = make([]net.IP, 0, len(status.remoteMembers)) + for member := range status.remoteMembers { + remoteNodeReceivers = append(remoteNodeReceivers, net.ParseIP(member)) + } + } + installLocalMulticastGroup := func() error { + if err := c.mRouteClient.multicastInterfacesJoinMgroup(status.group); err != nil { + klog.ErrorS(err, "Failed to install multicast group identified with local members", "group", groupKey) + return err + } + if c.encapEnabled { + if err := c.igmpSnooper.sendIGMPJoinReport([]net.IP{status.group}); err != nil { + klog.ErrorS(err, "Failed to sync local multicast group to other Nodes", "group", groupKey) return err } - // Remove the multicast flow entry if no local Pod is in the group. - if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil { - klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey) - return err + } + c.addInstalledLocalGroup(groupKey) + klog.InfoS("New local multicast group is added", "group", groupKey) + return nil + } + deleteLocalMulticastGroup := func() error { + err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) + if err != nil { + klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) + return err + } + klog.InfoS("Removed multicast route entry", "group", status.group) + err = c.mRouteClient.multicastInterfacesLeaveMgroup(status.group) + if err != nil { + klog.ErrorS(err, "Failed to leave multicast group for multicast interfaces", "group", groupKey) + return err + } + + if c.encapEnabled { + group := net.ParseIP(groupKey) + // Send IGMP leave message to other Nodes to notify the current Node leaves the given multicast group. + if err := c.igmpSnooper.sendIGMPLeaveReport([]net.IP{group}); err != nil { + klog.ErrorS(err, "Failed to send IGMP leave message to other Nodes", "group", groupKey) } - c.v4GroupAllocator.Release(status.ofGroupID) - err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) - if err != nil { - klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) - return err + } + c.delInstalledLocalGroup(groupKey) + return nil + } + if c.groupHasInstalled(groupKey) { + if c.groupIsStale(status) { + if c.localGroupHasInstalled(groupKey) { + if err := deleteLocalMulticastGroup(); err != nil { + return err + } + } + // TODO: add check on the stale multicast group that is joined by the Pods on a different Node. + // remoteMembers is always empty with noEncap mode. + if status.remoteMembers.Len() == 0 { + // Remove the multicast OpenFlow flow and group entries if none Pod member on local or remote Node is in the group. + if err := c.ofClient.UninstallMulticastFlows(status.group); err != nil { + klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) + return err + } + // Remove the multicast flow entry if no local Pod is in the group. + if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil { + klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey) + return err + } + c.v4GroupAllocator.Release(status.ofGroupID) + c.delInstalledGroup(groupKey) + c.groupCache.Delete(status) + klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) + return nil } - err = c.mRouteClient.multicastInterfacesLeaveMgroup(status.group) - if err != nil { - klog.ErrorS(err, "Failed to leave multicast group for multicast interfaces", "group", groupKey) + } else if !c.localGroupHasInstalled(groupKey) { + // Install multicast flows and routing entries for the multicast group that local Pods join. + if err := installLocalMulticastGroup(); err != nil { return err } - c.delInstalledGroup(groupKey) - c.groupCache.Delete(status) - klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) - return nil } - // Reinstall OpenFlow group because the local Pod receivers have changed. - if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + // Reinstall OpenFlow group because either the remote node receivers or local Pod receivers have changed. + klog.V(2).InfoS("Updating OpenFlow group for receivers in multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts, remoteNodeReceivers); err != nil { return err } - klog.V(2).InfoS("Updated OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) + klog.InfoS("Updated OpenFlow group for receivers in multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) return nil } // Install OpenFlow group for a new multicast group which has local Pod receivers joined. - if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts, remoteNodeReceivers); err != nil { return err } - klog.V(2).InfoS("Installed OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) + klog.V(2).InfoS("Installed OpenFlow group for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) // Install OpenFlow flow to forward packets to local Pod receivers which are included in the group. if err := c.ofClient.InstallMulticastFlows(status.group, status.ofGroupID); err != nil { klog.ErrorS(err, "Failed to install multicast flows", "group", status.group) return err } - if err := c.mRouteClient.multicastInterfacesJoinMgroup(status.group); err != nil { - klog.ErrorS(err, "Failed to join multicast group for multicast interfaces", "group", status.group) - return err + klog.InfoS("Installed OpenFlow flows for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) + if len(status.localMembers) > 0 { + err := installLocalMulticastGroup() + if err != nil { + return err + } } c.addInstalledGroup(groupKey) return nil @@ -461,6 +593,24 @@ func (c *Controller) delInstalledGroup(groupKey string) { c.installedGroupsMutex.Unlock() } +func (c *Controller) localGroupHasInstalled(groupKey string) bool { + c.installedLocalGroupsMutex.RLock() + defer c.installedLocalGroupsMutex.RUnlock() + return c.installedLocalGroups.Has(groupKey) +} + +func (c *Controller) addInstalledLocalGroup(groupKey string) { + c.installedLocalGroupsMutex.Lock() + c.installedLocalGroups.Insert(groupKey) + c.installedLocalGroupsMutex.Unlock() +} + +func (c *Controller) delInstalledLocalGroup(groupKey string) { + c.installedLocalGroupsMutex.Lock() + c.installedLocalGroups.Delete(groupKey) + c.installedLocalGroupsMutex.Unlock() +} + func (c *Controller) addOrUpdateGroupEvent(e *mcastGroupEvent) { obj, ok, _ := c.groupCache.GetByKey(e.group.String()) switch e.eType { @@ -509,7 +659,7 @@ func (c *Controller) updateQueryGroup() error { memberPorts = append(memberPorts, uint32(iface.OFPort)) } // Install OpenFlow group for a new multicast group which has local Pod receivers joined. - if err := c.ofClient.InstallMulticastGroup(c.queryGroupId, memberPorts); err != nil { + if err := c.ofClient.InstallMulticastGroup(c.queryGroupId, memberPorts, nil); err != nil { return err } klog.V(2).InfoS("Installed OpenFlow group for local receivers", "group", types.McastAllHosts.String(), @@ -517,6 +667,64 @@ func (c *Controller) updateQueryGroup() error { return nil } +// syncLocalGroupsToOtherNodes sends IGMP join message to other Nodes in the same cluster to notify what multicast groups +// are joined by this Node. This function is used only with encap mode. +func (c *Controller) syncLocalGroupsToOtherNodes() { + if c.installedLocalGroups.Len() == 0 { + return + } + localGroups := make([]net.IP, 0, c.installedLocalGroups.Len()) + c.installedLocalGroupsMutex.RLock() + for group := range c.installedLocalGroups { + localGroups = append(localGroups, net.ParseIP(group)) + } + c.installedLocalGroupsMutex.RUnlock() + if err := c.igmpSnooper.sendIGMPJoinReport(localGroups); err != nil { + klog.ErrorS(err, "Failed to sync local multicast groups to other Nodes") + } +} + +func (c *Controller) syncNodes() error { + startTime := time.Now() + defer func() { + klog.V(4).Infof("Finished syncing Node IPs. (%v)", time.Since(startTime)) + }() + + nodes, err := c.nodeLister.List(labels.Everything()) + if err != nil { + klog.ErrorS(err, "Failed to list Nodes") + return err + } + var updatedNodeIPs []net.IP + updatedNodeIPSet := sets.NewString() + for _, n := range nodes { + if n.Name == c.nodeConfig.Name { + continue + } + nip, err := k8s.GetNodeTransportAddrs(n) + if err != nil { + klog.ErrorS(err, "Failed to retrieve Node IP addresses", "node", n.Name) + return err + } + if nip.IPv4 != nil { + updatedNodeIPs = append(updatedNodeIPs, nip.IPv4) + updatedNodeIPSet.Insert(nip.IPv4.String()) + } + } + if c.installedNodes.Equal(updatedNodeIPSet) { + klog.V(2).InfoS("Nodes in the cluster are not changed, ignore the event") + return nil + } + if err := c.ofClient.InstallMulticastGroup(c.nodeGroupID, nil, updatedNodeIPs); err != nil { + klog.ErrorS(err, "Failed to update OpenFlow group for remote Nodes") + return err + } + c.installedNodes = updatedNodeIPSet + // Notify local installed multicast groups to other Nodes in the cluster. + c.syncLocalGroupsToOtherNodes() + return nil +} + func podInterfaceIndexFunc(obj interface{}) ([]string, error) { groupState := obj.(*GroupMemberStatus) podInterfaces := make([]string, 0, len(groupState.localMembers)) @@ -596,3 +804,94 @@ func (c *Controller) GetAllPodsStats() map[*interfacestore.InterfaceConfig]*PodT } return statsMap } + +func (c *Controller) checkNodeUpdate(old interface{}, cur interface{}) { + oldNode := old.(*corev1.Node) + if oldNode.Name == c.nodeConfig.Name { + return + } + curNode := cur.(*corev1.Node) + oldIPs, err := k8s.GetNodeTransportAddrs(oldNode) + if err != nil { + klog.ErrorS(err, "Failed to retrieve Node old IP addresses", "node", oldNode.Name) + return + } + newIPs, err := k8s.GetNodeTransportAddrs(curNode) + if err != nil { + klog.ErrorS(err, "Failed to retrieve Node current IP addresses", "node", curNode.Name) + return + } + if (*newIPs).Equal(*oldIPs) { + return + } + c.nodeUpdateQueue.Add(nodeUpdateKey) +} + +func (c *Controller) nodeWorker() { + for c.processNextNodeItem() { + } +} + +func (c *Controller) processNextNodeItem() bool { + obj, quit := c.nodeUpdateQueue.Get() + if quit { + return false + } + // We call Done here so the workqueue knows we have finished processing this item. We also + // must remember to call Forget if we do not want this work item being re-queued. For + // example, we do not call Forget if a transient error occurs, instead the item is put back + // on the workqueue and attempted again after a back-off period. + defer c.nodeUpdateQueue.Done(obj) + + // We expect strings (Node name) to come off the workqueue. + if key, ok := obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call Forget here else we'd + // go into a loop of attempting to process a work item that is invalid. + // This should not happen: only a constant string enqueues nodeUpdateQueue. + c.nodeUpdateQueue.Forget(obj) + klog.Errorf("Expected string in work queue but got %#v", obj) + return true + } else if err := c.syncNodes(); err == nil { + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.nodeUpdateQueue.Forget(key) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.nodeUpdateQueue.AddRateLimited(key) + klog.ErrorS(err, "Error syncing Nodes, requeuing") + } + return true +} + +func memberExists(status *GroupMemberStatus, e *mcastGroupEvent) bool { + var exist bool + if e.iface.Type == interfacestore.ContainerInterface { + _, exist = status.localMembers[e.iface.InterfaceName] + } else if e.iface.Type == interfacestore.TunnelInterface { + exist = status.remoteMembers.Has(e.srcNode.String()) + } + return exist +} + +func addGroupMember(status *GroupMemberStatus, e *mcastGroupEvent) *GroupMemberStatus { + if e.iface.Type == interfacestore.ContainerInterface { + status.localMembers[e.iface.InterfaceName] = e.time + status.lastIGMPReport = e.time + klog.V(2).InfoS("Added local member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + } else { + status.remoteMembers.Insert(e.srcNode.String()) + klog.V(2).InfoS("Added remote member from multicast group", "group", e.group.String(), "member", e.srcNode) + } + return status +} + +func deleteGroupMember(status *GroupMemberStatus, e *mcastGroupEvent) *GroupMemberStatus { + if e.iface.Type == interfacestore.ContainerInterface { + delete(status.localMembers, e.iface.InterfaceName) + klog.V(2).InfoS("Deleted local member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + } else { + status.remoteMembers.Delete(e.srcNode.String()) + klog.V(2).InfoS("Deleted remote member from multicast group", "group", e.group.String(), "member", e.srcNode) + } + return status +} diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index a0cfd86c814..af1ec593150 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -1,6 +1,3 @@ -//go:build linux -// +build linux - // Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,19 +15,26 @@ package multicast import ( + "context" + "fmt" "net" + "os" "sync" "testing" "time" - "antrea.io/libOpenflow/openflow13" "antrea.io/libOpenflow/protocol" "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" "antrea.io/antrea/pkg/agent/config" @@ -53,6 +57,8 @@ var ( mockIfaceStore *ifaceStoretest.MockInterfaceStore mockMulticastValidator *typestest.MockMcastNetworkPolicyController ovsClient *ovsconfigtest.MockOVSBridgeClient + clientset *fake.Clientset + informerFactory informers.SharedInformerFactory if1 = &interfacestore.InterfaceConfig{ Type: interfacestore.ContainerInterface, InterfaceName: "if1", @@ -69,10 +75,7 @@ var ( OFPort: 2, }, } - nodeIf1IP = net.ParseIP("192.168.20.22") - externalInterfaceIP = net.ParseIP("192.168.50.23") - pktInSrcMAC, _ = net.ParseMAC("11:22:33:44:55:66") - pktInDstMAC, _ = net.ParseMAC("01:00:5e:00:00:16") + nodeIf1IP = net.ParseIP("192.168.20.22") ) func TestAddGroupMemberStatus(t *testing.T) { @@ -83,7 +86,7 @@ func TestAddGroupMemberStatus(t *testing.T) { time: time.Now(), iface: if1, } - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) err := mctrl.initialize(t) mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, @@ -97,7 +100,7 @@ func TestAddGroupMemberStatus(t *testing.T) { assert.True(t, ok) assert.Equal(t, mgroup.String(), key) mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) mockOFClient.EXPECT().InstallMulticastFlows(mgroup, gomock.Any()).Times(1) mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1) err = mctrl.syncGroup(key) @@ -106,10 +109,9 @@ func TestAddGroupMemberStatus(t *testing.T) { } func TestUpdateGroupMemberStatus(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) err := mctrl.initialize(t) assert.Nil(t, err) - igmpMaxResponseTime = time.Second * 1 mgroup := net.ParseIP("224.96.1.4") event := &mcastGroupEvent{ group: mgroup, @@ -143,9 +145,8 @@ func TestUpdateGroupMemberStatus(t *testing.T) { } func TestCheckLastMember(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) workerCount = 1 - igmpMaxResponseTime = time.Second * 1 lastProbe := time.Now() mgroup := net.ParseIP("224.96.1.2") testCheckLastMember := func(ev *mcastGroupEvent, expExist bool) { @@ -195,7 +196,7 @@ func TestCheckLastMember(t *testing.T) { mctrl.queue.Forget(obj) } mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true).Times(1) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) for _, tc := range []struct { ev *mcastGroupEvent exists bool @@ -209,7 +210,7 @@ func TestCheckLastMember(t *testing.T) { } func TestClearStaleGroups(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) workerCount = 1 err := mctrl.initialize(t) assert.Nil(t, err) @@ -253,12 +254,14 @@ func TestClearStaleGroups(t *testing.T) { err := mctrl.groupCache.Add(g) assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) + mctrl.addInstalledLocalGroup(g.group.String()) } fakePort := int32(1) for _, g := range staleGroups { err := mctrl.groupCache.Add(g) assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) + mctrl.addInstalledLocalGroup(g.group.String()) for m := range g.localMembers { mockIface := &interfacestore.InterfaceConfig{InterfaceName: m, OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: fakePort}} mockIfaceStore.EXPECT().GetInterfaceByName(m).Return(mockIface, true) @@ -283,7 +286,7 @@ func TestClearStaleGroups(t *testing.T) { } func TestProcessPacketIn(t *testing.T) { - mockController := newMockMulticastController(t) + mockController := newMockMulticastController(t, false) snooper := mockController.igmpSnooper stopCh := make(chan struct{}) defer close(stopCh) @@ -449,6 +452,291 @@ func TestProcessPacketIn(t *testing.T) { } } +func TestEncapModeInitialize(t *testing.T) { + mockController := newMockMulticastController(t, true) + assert.True(t, mockController.nodeGroupID != 0) + err := mockController.initialize(t) + assert.Nil(t, err) +} + +func TestEncapLocalReportAndNotifyRemote(t *testing.T) { + mockController := newMockMulticastController(t, true) + _ = mockController.initialize(t) + mockController.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ + {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + go wait.Until(mockController.worker, time.Second, stopCh) + + iface1 := createInterface("pod1", 3) + iface2 := createInterface("pod2", 4) + mgroup := net.ParseIP("224.2.100.4") + for _, tc := range []struct { + e *mcastGroupEvent + interfaces []*interfacestore.InterfaceConfig + groupChanged bool + ifaceCheck bool + }{ + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: true, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: false, ifaceCheck: false}, + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface2}, interfaces: []*interfacestore.InterfaceConfig{iface1, iface2}, groupChanged: false, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupLeave, time: time.Now(), iface: iface2}, interfaces: []*interfacestore.InterfaceConfig{iface1, iface2}, groupChanged: false, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupLeave, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: true, ifaceCheck: true}, + } { + groupKey := tc.e.group.String() + if tc.e.eType == groupJoin { + if tc.groupChanged { + mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1) + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any()) + mockOFClient.EXPECT().InstallMulticastFlows(mgroup, gomock.Any()).Times(1) + } + if tc.ifaceCheck { + for _, iface := range tc.interfaces { + mockIfaceStore.EXPECT().GetInterfaceByName(iface.InterfaceName).Return(iface, true) + } + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) + } + } else { + if tc.ifaceCheck { + for _, iface := range tc.interfaces { + mockIfaceStore.EXPECT().GetInterfaceByName(iface.InterfaceName).Return(iface, true) + } + if len(tc.interfaces) == 1 { + mockOFClient.EXPECT().SendIGMPQueryPacketOut(igmpQueryDstMac, types.McastAllHosts, gomock.Any(), gomock.Any()).AnyTimes() + } + if !tc.groupChanged { + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) + } + } + if tc.groupChanged { + mockOFClient.EXPECT().UninstallGroup(gomock.Any()) + mockOFClient.EXPECT().UninstallMulticastFlows(tc.e.group) + mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any()) + } + } + mockController.addOrUpdateGroupEvent(tc.e) + + if tc.groupChanged { + err := wait.PollImmediate(time.Millisecond*100, time.Second*3, func() (done bool, err error) { + if tc.e.eType == groupJoin { + return mockController.localGroupHasInstalled(groupKey) && mockController.groupHasInstalled(groupKey), nil + } else { + return !mockController.localGroupHasInstalled(groupKey) && !mockController.groupHasInstalled(groupKey), nil + } + }) + assert.Nil(t, err) + } else { + time.Sleep(time.Millisecond * 200) + } + } +} + +func TestNodeUpdate(t *testing.T) { + mockController := newMockMulticastController(t, true) + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + mockController.addInstalledLocalGroup("224.2.100.1") + + wg := sync.WaitGroup{} + for _, tc := range []struct { + addedNodes map[string]map[string]string + deletedNodes []string + expectedNodes sets.String + }{ + { + addedNodes: map[string]map[string]string{ + "n1": {"ip": "10.10.10.11"}, + "n2": {"ip": "10.10.10.12"}, + }, + expectedNodes: sets.NewString("10.10.10.11", "10.10.10.12"), + }, + { + addedNodes: map[string]map[string]string{ + "n3": {"ip": "10.10.10.13"}, + }, + expectedNodes: sets.NewString("10.10.10.11", "10.10.10.12", "10.10.10.13"), + }, + { + deletedNodes: []string{ + "n1", + }, + expectedNodes: sets.NewString("10.10.10.12", "10.10.10.13"), + }, + { + addedNodes: map[string]map[string]string{ + "n4": {"ip": "10.10.10.14", "label": "10.10.10.24"}, + }, + deletedNodes: []string{ + "n2", + }, + expectedNodes: sets.NewString("10.10.10.13", "10.10.10.24"), + }, + } { + times := len(tc.addedNodes) + len(tc.deletedNodes) + mockOFClient.EXPECT().InstallMulticastGroup(mockController.nodeGroupID, nil, gomock.Any()).Return(nil).Times(times) + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any()).Times(times) + wg.Add(1) + + go func() { + for name, cfg := range tc.addedNodes { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + ip, exist := cfg["ip"] + if exist { + node.Status = corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + { + Type: corev1.NodeInternalIP, + Address: ip, + }, + }, + } + } + label, exist := cfg["label"] + if exist { + node.Annotations = map[string]string{ + types.NodeTransportAddressAnnotationKey: label, + } + } + clientset.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) + mockController.processNextNodeItem() + } + for _, name := range tc.deletedNodes { + clientset.CoreV1().Nodes().Delete(context.TODO(), name, metav1.DeleteOptions{}) + mockController.processNextNodeItem() + } + wg.Done() + }() + + wg.Wait() + assert.Equal(t, tc.expectedNodes, mockController.installedNodes, fmt.Sprintf("installedNodes: %v, expectedNodes: %v", mockController.installedNodes, tc.expectedNodes)) + } +} + +func TestRemoteMemberJoinLeave(t *testing.T) { + mockController := newMockMulticastController(t, true) + _ = mockController.initialize(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + stopStr := "done" + eventHandler := func(stopCh <-chan struct{}) { + for { + select { + case e := <-mockController.groupEventCh: + if e.group.Equal(net.IPv4zero) { + mockController.queue.Add(stopStr) + } else { + mockController.addOrUpdateGroupEvent(e) + } + case <-stopCh: + return + } + } + } + go eventHandler(stopCh) + + for _, tc := range []struct { + groupStrs []string + nodeStr string + isJoin bool + }{ + {groupStrs: []string{"224.2.100.2", "224.2.100.3"}, nodeStr: "10.10.10.11", isJoin: true}, + {groupStrs: []string{"224.2.100.3"}, nodeStr: "10.10.10.11", isJoin: true}, + {groupStrs: []string{"224.2.100.2", "224.2.100.5"}, nodeStr: "10.10.10.12", isJoin: true}, + {groupStrs: []string{"224.2.100.2"}, nodeStr: "10.10.10.12", isJoin: false}, + } { + groups := make([]net.IP, len(tc.groupStrs)) + for i, g := range tc.groupStrs { + groups[i] = net.ParseIP(g) + } + node := net.ParseIP(tc.nodeStr) + testRemoteReport(t, mockController, groups, node, tc.isJoin, stopStr) + } +} + +func testRemoteReport(t *testing.T, mockController *Controller, groups []net.IP, node net.IP, nodeJoin bool, stopStr string) { + tunnelPort := uint32(2) + proto := uint8(protocol.IGMPIsEx) + if !nodeJoin { + proto = uint8(protocol.IGMPToIn) + } + for _, g := range groups { + var exists bool + obj, exists, _ := mockController.groupCache.GetByKey(g.String()) + if !exists { + mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any()) + } else { + status := obj.(*GroupMemberStatus) + exists = status.remoteMembers.Has(node.String()) + if nodeJoin && exists || !nodeJoin && !exists { + continue + } + } + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), []uint32{config.HostGatewayOFPort}, gomock.Any()) + } + + processNextItem := func(stopStr string) { + for { + obj, quit := mockController.queue.Get() + if quit { + return + } + key := obj.(string) + if key == stopStr { + mockController.queue.Forget(key) + mockController.queue.Done(obj) + return + } + if err := mockController.syncGroup(key); err != nil { + t.Errorf("Failed to process %s: %v", key, err) + } + mockController.queue.Forget(key) + mockController.queue.Done(obj) + } + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + processNextItem(stopStr) + wg.Done() + }() + + err := processRemoteReport(t, mockController, groups, node, proto, tunnelPort) + assert.Nil(t, err) + mockController.groupEventCh <- &mcastGroupEvent{group: net.IPv4zero} + wg.Wait() + + for _, g := range groups { + obj, exists, _ := mockController.groupCache.GetByKey(g.String()) + assert.True(t, exists) + status := obj.(*GroupMemberStatus) + if nodeJoin { + assert.True(t, status.remoteMembers.Has(node.String())) + } else { + assert.False(t, status.remoteMembers.Has(node.String())) + } + } + for _, g := range groups { + assert.True(t, mockController.groupHasInstalled(g.String())) + } +} + +func processRemoteReport(t *testing.T, mockController *Controller, groups []net.IP, remoteNode net.IP, reportType uint8, tunnelPort uint32) error { + pkt := generatePacketInForRemoteReport(t, mockController.igmpSnooper, groups, remoteNode, reportType, tunnelPort) + mockIfaceStore.EXPECT().GetInterfaceByOFPort(tunnelPort).Return(createTunnelInterface(tunnelPort, nodeIf1IP), true) + return mockController.igmpSnooper.processPacketIn(&pkt) +} + func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEvent) { obj, exits, err := cache.GetByKey(event.group.String()) assert.Nil(t, err) @@ -467,7 +755,7 @@ func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEven } } -func newMockMulticastController(t *testing.T) *Controller { +func newMockMulticastController(t *testing.T, isEncap bool) *Controller { controller := gomock.NewController(t) mockOFClient = openflowtest.NewMockClient(controller) mockIfaceStore = ifaceStoretest.NewMockInterfaceStore(controller) @@ -479,17 +767,24 @@ func newMockMulticastController(t *testing.T) *Controller { mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) groupAllocator := openflow.NewGroupAllocator(false) podUpdateSubscriber := channel.NewSubscribableChannel("PodUpdate", 100) - mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber, time.Second*5, mockMulticastValidator) + + clientset = fake.NewSimpleClientset() + informerFactory = informers.NewSharedInformerFactory(clientset, 12*time.Hour) + mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber, time.Second*5, mockMulticastValidator, isEncap, informerFactory) return mctrl } func (c *Controller) initialize(t *testing.T) error { mockOFClient.EXPECT().InstallMulticastInitialFlows(uint8(0)).Times(1) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()) + mockOFClient.EXPECT().InstallMulticastGroup(c.queryGroupId, gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any()) mockIfaceStore.EXPECT().GetInterfacesByType(interfacestore.InterfaceType(0)).Times(1).Return([]*interfacestore.InterfaceConfig{}) mockMulticastSocket.EXPECT().AllocateVIFs(gomock.Any(), uint16(0)).Times(1).Return([]uint16{0}, nil) mockMulticastSocket.EXPECT().AllocateVIFs(gomock.Any(), uint16(1)).Times(1).Return([]uint16{1, 2}, nil) + if c.encapEnabled { + mockOFClient.EXPECT().InstallMulticastGroup(c.nodeGroupID, gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallMulticastRemoteReportFlows(c.nodeGroupID).Times(1) + } return c.Initialize() } @@ -509,32 +804,13 @@ func createInterface(name string, ofport uint32) *interfacestore.InterfaceConfig func createIGMPReportPacketIn(joinedGroups []net.IP, leftGroups []net.IP, version uint8, ofport uint32) []*ofctrl.PacketIn { joinMessages := createIGMPJoinMessage(joinedGroups, version) leaveMessages := createIGMPLeaveMessage(leftGroups, version) - generatePacket := func(m util.Message) ofctrl.PacketIn { - pkt := openflow13.NewPacketIn() - matchInport := openflow13.NewInPortField(ofport) - pkt.Match.AddField(*matchInport) - ipPacket := &protocol.IPv4{ - Version: 0x4, - IHL: 5, - Protocol: IGMPProtocolNumber, - Length: 20 + m.Len(), - Data: m, - } - pkt.Data = protocol.Ethernet{ - HWDst: pktInDstMAC, - HWSrc: pktInSrcMAC, - Ethertype: protocol.IPv4_MSG, - Data: ipPacket, - } - return ofctrl.PacketIn(*pkt) - } pkts := make([]*ofctrl.PacketIn, 0) for _, m := range joinMessages { - pkt := generatePacket(m) + pkt := generatePacket(m, ofport, nil) pkts = append(pkts, &pkt) } for _, m := range leaveMessages { - pkt := generatePacket(m) + pkt := generatePacket(m, ofport, nil) pkts = append(pkts, &pkt) } return pkts @@ -578,3 +854,8 @@ func createIGMPJoinMessage(groups []net.IP, version uint8) []util.Message { } return pkts } + +func TestMain(m *testing.M) { + igmpMaxResponseTime = time.Second + os.Exit(m.Run()) +} diff --git a/pkg/agent/multicast/mcast_discovery.go b/pkg/agent/multicast/mcast_discovery.go index d6ffcef5a75..648163dd367 100644 --- a/pkg/agent/multicast/mcast_discovery.go +++ b/pkg/agent/multicast/mcast_discovery.go @@ -39,6 +39,11 @@ const ( IGMPProtocolNumber = 2 ) +const ( + openflowKeyTunnelSrc = "NXM_NX_TUN_IPV4_SRC" + openflowKeyInPort = "OXM_OF_IN_PORT" +) + var ( // igmpMaxResponseTime is the maximum time allowed before sending a responding report which is used for the // "Max Resp Code" field in the IGMP query message. It is also the maximum time to wait for the IGMP report message @@ -46,6 +51,8 @@ var ( igmpMaxResponseTime = time.Second * 10 // igmpQueryDstMac is the MAC address used in the dst MAC field in the IGMP query message igmpQueryDstMac, _ = net.ParseMAC("01:00:5e:00:00:01") + // igmpReportDstMac is the MAC address used in the dst MAC field in the IGMP report message + igmpReportDstMac, _ = net.ParseMAC("01:00:5e:00:00:16") ) type IGMPSnooper struct { @@ -62,6 +69,7 @@ type IGMPSnooper struct { // Similar to igmpReportANPStats, it stores ACNP stats for IGMP reports. igmpReportACNPStats map[apitypes.UID]map[string]*types.RuleMetric igmpReportACNPStatsMutex sync.Mutex + encapEnabled bool } func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error { @@ -92,7 +100,7 @@ func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow13.NXRange) (uint32, func (s *IGMPSnooper) parseSrcInterface(pktIn *ofctrl.PacketIn) (*interfacestore.InterfaceConfig, error) { matches := pktIn.GetMatches() - ofPortField := matches.GetMatchByName("OXM_OF_IN_PORT") + ofPortField := matches.GetMatchByName(openflowKeyInPort) if ofPortField == nil { return nil, errors.New("in_port field not found") } @@ -127,6 +135,11 @@ func (s *IGMPSnooper) validate(event *mcastGroupEvent, igmpType uint8, packetInD // Return true directly if there is no validator. return true, nil } + // MulticastValidator only validates the IGMP report message sent from Pods. The report message received from tunnel + // port is sent from Antrea Agent on a different Node, and returns true directly. + if event.iface.Type == interfacestore.TunnelInterface { + return true, nil + } if event.iface.Type != interfacestore.ContainerInterface { return true, fmt.Errorf("interface is not container") } @@ -201,6 +214,42 @@ func (s *IGMPSnooper) collectStats() (igmpANPStats, igmpACNPStats map[apitypes.U return igmpANPStats, igmpACNPStats } +func (s *IGMPSnooper) sendIGMPReport(groupRecordType uint8, groups []net.IP) error { + igmp, err := s.generateIGMPReportPacket(groupRecordType, groups) + if err != nil { + return err + } + if err := s.ofClient.SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, igmp); err != nil { + return err + } + klog.V(2).InfoS("Sent packetOut for IGMP v3 report", "groups", groups) + return nil +} + +func (s *IGMPSnooper) generateIGMPReportPacket(groupRecordType uint8, groups []net.IP) (util.Message, error) { + records := make([]protocol.IGMPv3GroupRecord, len(groups)) + for i, group := range groups { + records[i] = protocol.IGMPv3GroupRecord{ + Type: groupRecordType, + MulticastAddress: group, + } + } + return &protocol.IGMPv3MembershipReport{ + Type: protocol.IGMPv3Report, + Checksum: 0, + NumberOfGroups: uint16(len(records)), + GroupRecords: records, + }, nil +} + +func (s *IGMPSnooper) sendIGMPJoinReport(groups []net.IP) error { + return s.sendIGMPReport(protocol.IGMPIsEx, groups) +} + +func (s *IGMPSnooper) sendIGMPLeaveReport(groups []net.IP) error { + return s.sendIGMPReport(protocol.IGMPToIn, groups) +} + func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { now := time.Now() iface, err := s.parseSrcInterface(pktIn) @@ -209,8 +258,15 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { } klog.V(2).InfoS("Received PacketIn for IGMP packet", "in_port", iface.OFPort) podName := "unknown" + var srcNode net.IP if iface.Type == interfacestore.ContainerInterface { podName = iface.PodName + } else if iface.Type == interfacestore.TunnelInterface { + var err error + srcNode, err = s.parseSrcNode(pktIn) + if err != nil { + return err + } } igmp, err := parseIGMPPacket(pktIn.Data) if err != nil { @@ -240,10 +296,11 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { evtType = groupLeave } event := &mcastGroupEvent{ - group: mgroup, - eType: evtType, - time: now, - iface: iface, + group: mgroup, + eType: evtType, + time: now, + iface: iface, + srcNode: srcNode, } s.validatePacketAndNotify(event, igmpType, pktIn.Data) } @@ -261,6 +318,16 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { return nil } +func (s *IGMPSnooper) parseSrcNode(pktIn *ofctrl.PacketIn) (net.IP, error) { + matches := pktIn.GetMatches() + tunSrcField := matches.GetMatchByName(openflowKeyTunnelSrc) + if tunSrcField == nil { + return nil, errors.New("in_port field not found") + } + tunSrc := tunSrcField.GetValue().(net.IP) + return tunSrc, nil +} + func generateIGMPQueryPacket(group net.IP, version uint8, queryInterval time.Duration) (util.Message, error) { // The max response time field in IGMP protocol uses a value in units of 1/10 second. // See https://datatracker.ietf.org/doc/html/rfc2236 and https://datatracker.ietf.org/doc/html/rfc3376 @@ -332,8 +399,8 @@ func parseIGMPPacket(pkt protocol.Ethernet) (protocol.IGMPMessage, error) { } } -func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent, queryInterval time.Duration, multicastValidator types.McastNetworkPolicyController) *IGMPSnooper { - snooper := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh, validator: multicastValidator, queryInterval: queryInterval} +func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent, queryInterval time.Duration, multicastValidator types.McastNetworkPolicyController, encapEnabled bool) *IGMPSnooper { + snooper := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh, validator: multicastValidator, queryInterval: queryInterval, encapEnabled: encapEnabled} snooper.igmpReportACNPStats = make(map[apitypes.UID]map[string]*types.RuleMetric) snooper.igmpReportANPStats = make(map[apitypes.UID]map[string]*types.RuleMetric) ofClient.RegisterPacketInHandler(uint8(openflow.PacketInReasonMC), "MulticastGroupDiscovery", snooper) diff --git a/pkg/agent/multicast/mcast_discovery_test.go b/pkg/agent/multicast/mcast_discovery_test.go new file mode 100644 index 00000000000..3af352277f3 --- /dev/null +++ b/pkg/agent/multicast/mcast_discovery_test.go @@ -0,0 +1,185 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicast + +import ( + "net" + "sync" + "testing" + + "antrea.io/libOpenflow/openflow13" + "antrea.io/libOpenflow/protocol" + "antrea.io/libOpenflow/util" + "antrea.io/ofnet/ofctrl" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" + + "antrea.io/antrea/pkg/agent/interfacestore" + ifaceStoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" + "antrea.io/antrea/pkg/ovs/ovsconfig" +) + +var ( + pktInSrcMAC, _ = net.ParseMAC("11:22:33:44:55:66") + pktInDstMAC, _ = net.ParseMAC("01:00:5e:00:00:16") +) + +type snooperValidator struct { + eventCh chan *mcastGroupEvent + groupJoinedNodes map[string]sets.String + groupLeftNodes map[string]sets.String +} + +func (v *snooperValidator) processPackets(expectedPackets int) { + appendSrcNode := func(groupKey string, groupNodes map[string]sets.String, nodeIP net.IP) map[string]sets.String { + _, exists := groupNodes[groupKey] + if !exists { + groupNodes[groupKey] = sets.NewString() + } + groupNodes[groupKey] = groupNodes[groupKey].Insert(nodeIP.String()) + return groupNodes + } + for i := 0; i < expectedPackets; i++ { + select { + case e := <-v.eventCh: + groupKey := e.group.String() + if e.eType == groupJoin { + v.groupJoinedNodes = appendSrcNode(groupKey, v.groupJoinedNodes, e.srcNode) + } else { + v.groupLeftNodes = appendSrcNode(groupKey, v.groupLeftNodes, e.srcNode) + } + } + } +} + +func TestIGMPRemoteReport(t *testing.T) { + controller := gomock.NewController(t) + mockOFClient := openflowtest.NewMockClient(controller) + mockIfaceStore := ifaceStoretest.NewMockInterfaceStore(controller) + eventCh := make(chan *mcastGroupEvent, 100) + snooper := &IGMPSnooper{ofClient: mockOFClient, eventCh: eventCh, ifaceStore: mockIfaceStore} + + localNodeIP := net.ParseIP("1.2.3.4") + tunnelPort := uint32(1) + wg := sync.WaitGroup{} + + generateRemotePackets := func(groups []net.IP, nodes []net.IP, igmpMsgType uint8) []ofctrl.PacketIn { + packets := make([]ofctrl.PacketIn, 0, len(nodes)) + for _, srcNode := range nodes { + pkt := generatePacketInForRemoteReport(t, snooper, groups, srcNode, igmpMsgType, tunnelPort) + packets = append(packets, pkt) + } + return packets + } + validateGroupNodes := func(groups []net.IP, expectedNodesIPs []net.IP, testGroupNodes map[string]sets.String) { + if len(expectedNodesIPs) == 0 { + return + } + for _, g := range groups { + expectedNodes := sets.NewString() + for _, n := range expectedNodesIPs { + expectedNodes.Insert(n.String()) + } + nodes, exists := testGroupNodes[g.String()] + assert.True(t, exists) + assert.True(t, nodes.HasAll(expectedNodes.List()...)) + } + } + testPacketProcess := func(groups []net.IP, joinedNodes []net.IP, leftNodes []net.IP) { + validator := snooperValidator{eventCh: eventCh, groupJoinedNodes: make(map[string]sets.String), groupLeftNodes: make(map[string]sets.String)} + packets := make([]ofctrl.PacketIn, 0, len(joinedNodes)+len(leftNodes)) + packets = append(packets, generateRemotePackets(groups, joinedNodes, protocol.IGMPIsEx)...) + packets = append(packets, generateRemotePackets(groups, leftNodes, protocol.IGMPToIn)...) + + eventCount := len(groups) * len(packets) + wg.Add(1) + go func() { + validator.processPackets(eventCount) + wg.Done() + }() + + mockIfaceStore.EXPECT().GetInterfaceByOFPort(tunnelPort).Return(createTunnelInterface(tunnelPort, localNodeIP), true).Times(len(packets)) + for i := range packets { + pkt := &packets[i] + err := snooper.processPacketIn(pkt) + assert.Nil(t, err, "Failed to process IGMP Report message") + } + + wg.Wait() + + validateGroupNodes(groups, joinedNodes, validator.groupJoinedNodes) + validateGroupNodes(groups, leftNodes, validator.groupLeftNodes) + } + + for _, tc := range []struct { + groupsStrings []string + joinedNodesStrings []string + leftNodesStrings []string + }{ + {groupsStrings: []string{"225.1.2.3", "225.1.2.4"}, joinedNodesStrings: []string{"1.2.3.5", "1.2.3.6"}, leftNodesStrings: []string{"1.2.3.6"}}, + {groupsStrings: []string{"225.1.2.5"}, joinedNodesStrings: []string{"1.2.3.5"}}, + {groupsStrings: []string{"225.1.2.6"}, leftNodesStrings: []string{"1.2.3.6"}}, + } { + var groups, joinedNodes, leftNodes []net.IP + for _, g := range tc.groupsStrings { + groups = append(groups, net.ParseIP(g)) + } + for _, n := range tc.joinedNodesStrings { + joinedNodes = append(joinedNodes, net.ParseIP(n)) + } + for _, n := range tc.leftNodesStrings { + leftNodes = append(leftNodes, net.ParseIP(n)) + } + testPacketProcess(groups, joinedNodes, leftNodes) + } +} + +func generatePacket(m util.Message, ofport uint32, srcNodeIP net.IP) ofctrl.PacketIn { + pkt := openflow13.NewPacketIn() + matchInport := openflow13.NewInPortField(ofport) + pkt.Match.AddField(*matchInport) + if srcNodeIP != nil { + matchTunSrc := openflow13.NewTunnelIpv4SrcField(srcNodeIP, nil) + pkt.Match.AddField(*matchTunSrc) + } + ipPacket := &protocol.IPv4{ + Version: 0x4, + IHL: 5, + Protocol: IGMPProtocolNumber, + Length: 20 + m.Len(), + Data: m, + } + pkt.Data = protocol.Ethernet{ + HWDst: pktInDstMAC, + HWSrc: pktInSrcMAC, + Ethertype: protocol.IPv4_MSG, + Data: ipPacket, + } + return ofctrl.PacketIn(*pkt) +} + +func generatePacketInForRemoteReport(t *testing.T, snooper *IGMPSnooper, groups []net.IP, srcNode net.IP, igmpMsgType uint8, tunnelPort uint32) ofctrl.PacketIn { + msg, err := snooper.generateIGMPReportPacket(igmpMsgType, groups) + assert.Nil(t, err, "Failed to generate IGMP Report message") + return generatePacket(msg, tunnelPort, srcNode) +} + +func createTunnelInterface(tunnelPort uint32, localNodeIP net.IP) *interfacestore.InterfaceConfig { + tunnelInterface := interfacestore.NewTunnelInterface("antrea-tun0", ovsconfig.GeneveTunnel, localNodeIP, false) + tunnelInterface.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: int32(tunnelPort)} + return tunnelInterface +} diff --git a/pkg/agent/multicast/mcast_route.go b/pkg/agent/multicast/mcast_route.go index 279c11c4f19..bf2b9ea1251 100644 --- a/pkg/agent/multicast/mcast_route.go +++ b/pkg/agent/multicast/mcast_route.go @@ -32,7 +32,7 @@ const ( MulticastRecvBufferSize = 128 ) -func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.String) *MRouteClient { +func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.String, encapEnabled bool) *MRouteClient { var m = &MRouteClient{ igmpMsgChan: make(chan []byte, workerCount), nodeConfig: nodeconfig, @@ -83,11 +83,11 @@ type MRouteClient struct { // by making these interfaces accept multicast traffic with multicast ip:mgroup. // https://tldp.org/HOWTO/Multicast-HOWTO-6.html#ss6.4 func (c *MRouteClient) multicastInterfacesJoinMgroup(mgroup net.IP) error { + groupIP := mgroup.To4() for _, config := range c.multicastInterfaceConfigs { addrIP := config.IPv4Addr.IP.To4() - groupIP := mgroup.To4() err := c.socket.MulticastInterfaceJoinMgroup(groupIP, addrIP, config.Name) - if err != nil { + if err != nil && !strings.Contains(err.Error(), "address already in use") { return err } } @@ -95,9 +95,9 @@ func (c *MRouteClient) multicastInterfacesJoinMgroup(mgroup net.IP) error { } func (c *MRouteClient) multicastInterfacesLeaveMgroup(mgroup net.IP) error { + groupIP := mgroup.To4() for _, config := range c.multicastInterfaceConfigs { addrIP := config.IPv4Addr.IP.To4() - groupIP := mgroup.To4() err := c.socket.MulticastInterfaceLeaveMgroup(groupIP, addrIP, config.Name) if err != nil { return err @@ -158,7 +158,7 @@ func (c *MRouteClient) deleteInboundMrouteEntryByGroup(group net.IP) (err error) } // addOutboundMrouteEntry configures multicast route from Antrea gateway to all the multicast interfaces, -// allowing multicast sender Pods to send multicast traffic to external. +// allowing multicast srcNode Pods to send multicast traffic to external. func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err error) { klog.V(2).InfoS("Adding outbound multicast route entry", "src", src, "group", group, "outboundVIFs", c.externalInterfaceVIFs) err = c.socket.AddMrouteEntry(src, group, c.internalInterfaceVIF, c.externalInterfaceVIFs) diff --git a/pkg/agent/multicast/mcast_route_test.go b/pkg/agent/multicast/mcast_route_test.go index b9f4c14d69c..4fff580436c 100644 --- a/pkg/agent/multicast/mcast_route_test.go +++ b/pkg/agent/multicast/mcast_route_test.go @@ -33,9 +33,10 @@ import ( ) var ( - addrIf1 = &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)} - addrIf2 = &net.IPNet{IP: externalInterfaceIP, Mask: net.IPv4Mask(255, 255, 255, 0)} - nodeConfig = &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addrIf1} + externalInterfaceIP = net.ParseIP("192.168.50.23") + addrIf1 = &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)} + addrIf2 = &net.IPNet{IP: externalInterfaceIP, Mask: net.IPv4Mask(255, 255, 255, 0)} + nodeConfig = &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addrIf1} ) func TestParseIGMPMsg(t *testing.T) { @@ -117,7 +118,7 @@ func newMockMulticastRouteClient(t *testing.T) *MRouteClient { groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) - return newRouteClient(nodeConfig, groupCache, mockMulticastSocket, sets.NewString(if1.InterfaceName)) + return newRouteClient(nodeConfig, groupCache, mockMulticastSocket, sets.NewString(if1.InterfaceName), false) } func (c *MRouteClient) initialize(t *testing.T) error { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index d4995f2117d..1f5fe3be0ab 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -19,6 +19,7 @@ import ( "math/rand" "net" + "antrea.io/libOpenflow/openflow13" "antrea.io/libOpenflow/protocol" ofutil "antrea.io/libOpenflow/util" v1 "k8s.io/api/core/v1" @@ -280,11 +281,19 @@ type Client interface { // InstallMulticastInitialFlows installs OpenFlow to packetIn the IGMP messages and output the Multicast traffic to // antrea-gw0 so that local Pods could access external Multicast servers. InstallMulticastInitialFlows(pktInReason uint8) error + // InstallMulticastFlows installs the flow to forward Multicast traffic normally, and output it to antrea-gw0 // to ensure it can be forwarded to the external addresses. InstallMulticastFlows(multicastIP net.IP, groupID binding.GroupIDType) error + // UninstallMulticastFlows removes the flow matching the given multicastIP. UninstallMulticastFlows(multicastIP net.IP) error + + // InstallMulticastRemoteReportFlows installs flows to forward the IGMP report messages to the other Nodes, + // and packetIn the report messages to Antrea Agent which is received via tunnel port. + // The OpenFlow group identified by groupID is used to forward packet to all other Nodes in the cluster + // over tunnel. + InstallMulticastRemoteReportFlows(groupID binding.GroupIDType) error // SendIGMPQueryPacketOut sends the IGMPQuery packet as a packet-out to OVS from the gateway port. SendIGMPQueryPacketOut( dstMAC net.HardwareAddr, @@ -304,7 +313,13 @@ type Client interface { // UninstallTrafficControlReturnPortFlow removes the flow to classify the packets from a return port. UninstallTrafficControlReturnPortFlow(returnOFPort uint32) error - InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32) error + InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error + + // SendIGMPRemoteReportPacketOut sends the IGMP report packet as a packet-out to remote Nodes via the tunnel port. + SendIGMPRemoteReportPacketOut( + dstMAC net.HardwareAddr, + dstIP net.IP, + igmp ofutil.Message) error // InstallMulticlusterNodeFlows installs flows to handle cross-cluster packets between a regular // Node and a local Gateway. @@ -800,7 +815,7 @@ func (c *client) generatePipelines() { if c.enableMulticast { // TODO: add support for IPv6 protocol - c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge, c.enableAntreaPolicy, c.nodeConfig.GatewayConfig.OFPort) + c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge, c.enableAntreaPolicy, c.nodeConfig.GatewayConfig.OFPort, c.networkConfig.TrafficEncapMode.SupportsEncap(), config.DefaultTunOFPort) c.activatedFeatures = append(c.activatedFeatures, c.featureMulticast) } @@ -1190,6 +1205,15 @@ func (c *client) UninstallMulticastFlows(multicastIP net.IP) error { return c.deleteFlows(c.featureMulticast.cachedFlows, cacheKey) } +func (c *client) InstallMulticastRemoteReportFlows(groupID binding.GroupIDType) error { + firstMulticastTable := c.pipelines[pipelineMulticast].GetFirstTable() + flows := c.featureMulticast.multicastRemoteReportFlows(groupID, firstMulticastTable) + cacheKey := "multicast_encap" + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.addFlows(c.featureMulticast.cachedFlows, cacheKey, flows) +} + func (c *client) SendIGMPQueryPacketOut( dstMAC net.HardwareAddr, dstIP net.IP, @@ -1240,7 +1264,25 @@ func (c *client) UninstallTrafficControlReturnPortFlow(returnOFPort uint32) erro return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey) } -func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32) error { +func (c *client) SendIGMPRemoteReportPacketOut( + dstMAC net.HardwareAddr, + dstIP net.IP, + igmp ofutil.Message) error { + srcMAC := c.nodeConfig.GatewayConfig.MAC.String() + srcIP := c.nodeConfig.NodeTransportIPv4Addr.IP.String() + dstMACStr := dstMAC.String() + dstIPStr := dstIP.String() + packetOutBuilder, err := setBasePacketOutBuilder(c.bridge.BuildPacketOut(), srcMAC, dstMACStr, srcIP, dstIPStr, openflow13.P_CONTROLLER, 0) + if err != nil { + return err + } + // Set protocol, L4 message, and target OF Group ID. + packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolIGMP).SetL4Packet(igmp) + packetOutObj := packetOutBuilder.Done() + return c.bridge.SendPacketOut(packetOutObj) +} + +func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() table := MulticastOutputTable @@ -1248,7 +1290,7 @@ func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceive table = MulticastIngressRuleTable } - if err := c.featureMulticast.multicastReceiversGroup(groupID, table.GetID(), localReceivers...); err != nil { + if err := c.featureMulticast.multicastReceiversGroup(groupID, table.GetID(), localReceivers, remoteNodeReceivers); err != nil { return err } return nil diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index 79df3424a88..5d3de4bd05f 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -19,8 +19,10 @@ import ( "net" "sync" + "antrea.io/libOpenflow/openflow13" "k8s.io/klog/v2" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow/cookie" "antrea.io/antrea/pkg/agent/types" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -31,6 +33,8 @@ type featureMulticast struct { ipProtocols []binding.Protocol bridge binding.Bridge gatewayPort uint32 + encapEnabled bool + tunnelPort uint32 cachedFlows *flowCategoryCache groupCache sync.Map @@ -43,7 +47,7 @@ func (f *featureMulticast) getFeatureName() string { return "Multicast" } -func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge, anpEnabled bool, gwPort uint32) *featureMulticast { +func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge, anpEnabled bool, gwPort uint32, encapEnabled bool, tunnelPort uint32) *featureMulticast { return &featureMulticast{ cookieAllocator: cookieAllocator, ipProtocols: ipProtocols, @@ -53,6 +57,8 @@ func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding groupCache: sync.Map{}, enableAntreaPolicy: anpEnabled, gatewayPort: gwPort, + encapEnabled: encapEnabled, + tunnelPort: tunnelPort, } } @@ -68,9 +74,7 @@ func multicastPipelineClassifyFlow(cookieID uint64, pipeline binding.Pipeline) b func (f *featureMulticast) initFlows() []binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() - return []binding.Flow{ - f.multicastOutputFlow(cookieID), - } + return f.multicastOutputFlows(cookieID) } func (f *featureMulticast) replayFlows() []binding.Flow { @@ -78,7 +82,7 @@ func (f *featureMulticast) replayFlows() []binding.Flow { return getCachedFlows(f.cachedFlows) } -func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports ...uint32) error { +func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports []uint32, remoteIPs []net.IP) error { group := f.bridge.CreateGroupTypeAll(groupID).ResetBuckets() for i := range ports { group = group.Bucket(). @@ -87,6 +91,14 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, ResubmitToTable(tableID). Done() } + for _, ip := range remoteIPs { + group = group.Bucket(). + LoadToRegField(OFPortFoundRegMark.GetField(), OFPortFoundRegMark.GetValue()). + LoadToRegField(TargetOFPortField, f.tunnelPort). + SetTunnelDst(ip). + ResubmitToTable(MulticastOutputTable.GetID()). + Done() + } if err := group.Add(); err != nil { return fmt.Errorf("error when installing Multicast receiver Group: %w", err) } @@ -94,12 +106,38 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, return nil } -func (f *featureMulticast) multicastOutputFlow(cookieID uint64) binding.Flow { - return MulticastOutputTable.ofTable.BuildFlow(priorityNormal). - Cookie(cookieID). - MatchRegMark(OFPortFoundRegMark). - Action().OutputToRegField(TargetOFPortField). - Done() +func (f *featureMulticast) multicastOutputFlows(cookieID uint64) []binding.Flow { + flows := []binding.Flow{ + MulticastOutputTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID). + MatchRegMark(OFPortFoundRegMark). + Action().OutputToRegField(TargetOFPortField). + Done(), + } + if f.encapEnabled { + // When running with encap mode, drop the multicast packets if it is received from tunnel port and expected to + // output to antrea-gw0, or received from antrea-gw0 and expected to output to tunnel. These flows are used to + // avoid duplication on packet forwarding. For example, if the packet is received on tunnel port, it means + // the sender is a Pod on other Node, then the packet is already sent to external via antrea-gw0 on the source + // Node. On the reverse, if the packet is received on antrea-gw0, it means the sender is from external, then + // the Pod receivers on other Nodes should also receive the packets from the underlay network. + flows = append(flows, MulticastOutputTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchRegMark(FromTunnelRegMark). + MatchRegMark(OFPortFoundRegMark). + MatchRegFieldWithValue(TargetOFPortField, config.HostGatewayOFPort). + Action().Drop(). + Done(), + MulticastOutputTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchRegMark(FromGatewayRegMark). + MatchRegMark(OFPortFoundRegMark). + MatchRegFieldWithValue(TargetOFPortField, config.DefaultTunOFPort). + Action().Drop(). + Done(), + ) + } + return flows } func (f *featureMulticast) multicastSkipIGMPMetricFlows() []binding.Flow { @@ -147,3 +185,34 @@ func (f *featureMulticast) replayGroups() { return true }) } + +func (f *featureMulticast) multicastRemoteReportFlows(groupID binding.GroupIDType, firstMulticastTable binding.Table) []binding.Flow { + return []binding.Flow{ + // This flow outputs the IGMP report message sent from Antrea Agent to an OpenFlow group which is expected to + // broadcast to all the other Nodes in the cluster. The multicast groups in side the IGMP report message + // include the ones local Pods have joined in. + MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(binding.ProtocolIGMP). + MatchInPort(openflow13.P_CONTROLLER). + Action().LoadRegMark(CustomReasonIGMPRegMark). + Action().Group(groupID). + Done(), + // This flow ensures the IGMP report message sent from Antrea Agent to bypass the check in SpoofGuardTable. + ClassifierTable.ofTable.BuildFlow(priorityNormal). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchInPort(openflow13.P_CONTROLLER). + Action().GotoTable(SpoofGuardTable.GetNext()). + Done(), + // This flow ensures the multicast packet sent from a different Node via the tunnel port to enter Multicast + // pipeline. + ClassifierTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchInPort(f.tunnelPort). + MatchProtocol(binding.ProtocolIP). + MatchDstIPNet(*types.McastCIDR). + Action().LoadRegMark(FromTunnelRegMark). + Action().GotoTable(firstMulticastTable.GetID()). + Done(), + } +} diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 3fdb03c5f69..b928ea13bef 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2666,20 +2666,24 @@ func pipelineClassifyFlow(cookieID uint64, protocol binding.Protocol, pipeline b // igmpPktInFlows generates the flow to load CustomReasonIGMPRegMark to mark the IGMP packet in MulticastRoutingTable // and sends it to antrea-agent. func (f *featureMulticast) igmpPktInFlows(reason uint8) []binding.Flow { - flows := []binding.Flow{ - // Set a custom reason for the IGMP packets, and then send it to antrea-agent and forward it normally in the - // OVS bridge, so that the OVS multicast db cache can be updated, and antrea-agent can identify the local multicast - // group and its members in the meanwhile. - // Do not set dst IP address because IGMPv1 report message uses target multicast group as IP destination in - // the packet. - MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). - Cookie(f.cookieAllocator.Request(f.category).Raw()). - MatchProtocol(binding.ProtocolIGMP). - MatchRegMark(FromLocalRegMark). - Action().LoadRegMark(CustomReasonIGMPRegMark). - Action().SendToController(reason). - Action().Normal(). - Done(), + var flows []binding.Flow + sourceMarks := []*binding.RegMark{FromLocalRegMark} + if f.encapEnabled { + sourceMarks = append(sourceMarks, FromTunnelRegMark) + } + for _, m := range sourceMarks { + flows = append(flows, + // Set a custom reason for the IGMP packets, and then send it to antrea-agent. Then antrea-agent can identify + // the local multicast group and its members in the meanwhile. + // Do not set dst IP address because IGMPv1 report message uses target multicast group as IP destination in + // the packet. + MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(binding.ProtocolIGMP). + MatchRegMark(m). + Action().LoadRegMark(CustomReasonIGMPRegMark). + Action().SendToController(reason). + Done()) } return flows } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index f59dc19007f..cfb9be05fd8 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -297,17 +297,17 @@ func (mr *MockClientMockRecorder) InstallMulticastFlows(arg0, arg1 interface{}) } // InstallMulticastGroup mocks base method -func (m *MockClient) InstallMulticastGroup(arg0 openflow.GroupIDType, arg1 []uint32) error { +func (m *MockClient) InstallMulticastGroup(arg0 openflow.GroupIDType, arg1 []uint32, arg2 []net.IP) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallMulticastGroup", arg0, arg1) + ret := m.ctrl.Call(m, "InstallMulticastGroup", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // InstallMulticastGroup indicates an expected call of InstallMulticastGroup -func (mr *MockClientMockRecorder) InstallMulticastGroup(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallMulticastGroup(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastGroup", reflect.TypeOf((*MockClient)(nil).InstallMulticastGroup), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastGroup", reflect.TypeOf((*MockClient)(nil).InstallMulticastGroup), arg0, arg1, arg2) } // InstallMulticastInitialFlows mocks base method @@ -324,6 +324,20 @@ func (mr *MockClientMockRecorder) InstallMulticastInitialFlows(arg0 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastInitialFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastInitialFlows), arg0) } +// InstallMulticastRemoteReportFlows mocks base method +func (m *MockClient) InstallMulticastRemoteReportFlows(arg0 openflow.GroupIDType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticastRemoteReportFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticastRemoteReportFlows indicates an expected call of InstallMulticastRemoteReportFlows +func (mr *MockClientMockRecorder) InstallMulticastRemoteReportFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastRemoteReportFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastRemoteReportFlows), arg0) +} + // InstallMulticlusterClassifierFlows mocks base method func (m *MockClient) InstallMulticlusterClassifierFlows(arg0 uint32, arg1 bool) error { m.ctrl.T.Helper() @@ -670,6 +684,20 @@ func (mr *MockClientMockRecorder) SendIGMPQueryPacketOut(arg0, arg1, arg2, arg3 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendIGMPQueryPacketOut", reflect.TypeOf((*MockClient)(nil).SendIGMPQueryPacketOut), arg0, arg1, arg2, arg3) } +// SendIGMPRemoteReportPacketOut mocks base method +func (m *MockClient) SendIGMPRemoteReportPacketOut(arg0 net.HardwareAddr, arg1 net.IP, arg2 util.Message) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendIGMPRemoteReportPacketOut", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendIGMPRemoteReportPacketOut indicates an expected call of SendIGMPRemoteReportPacketOut +func (mr *MockClientMockRecorder) SendIGMPRemoteReportPacketOut(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendIGMPRemoteReportPacketOut", reflect.TypeOf((*MockClient)(nil).SendIGMPRemoteReportPacketOut), arg0, arg1, arg2) +} + // SendTCPPacketOut mocks base method func (m *MockClient) SendTCPPacketOut(arg0, arg1, arg2, arg3 string, arg4, arg5 uint32, arg6 bool, arg7, arg8 uint16, arg9 uint32, arg10 byte, arg11 func(openflow.PacketOutBuilder) openflow.PacketOutBuilder) error { m.ctrl.T.Helper() diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 1514e226229..8f57d0d8040 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -57,6 +57,10 @@ const ( localAntreaFlexibleIPAMPodIPSet = "LOCAL-FLEXIBLE-IPAM-POD-IP" // localAntreaFlexibleIPAMPodIP6Set contains all AntreaFlexibleIPAM Pod IPv6s of this Node. localAntreaFlexibleIPAMPodIP6Set = "LOCAL-FLEXIBLE-IPAM-POD-IP6" + // clusterNodeIPSet contains all other Node IPs in the cluster. + clusterNodeIPSet = "CLUSTER-NODE-IP" + // clusterNodeIP6Set contains all other Node IP6s in the cluster. + clusterNodeIP6Set = "CLUSTER-NODE-IP6" // Antrea proxy NodePort IP antreaNodePortIPSet = "ANTREA-NODEPORT-IP" @@ -114,6 +118,10 @@ type Client struct { clusterIPv4CIDR *net.IPNet // clusterIPv6CIDR stores the calculated ClusterIP CIDR for IPv6. clusterIPv6CIDR *net.IPNet + // clusterNodeIPs stores the IPv4 of all other Nodes in the cluster + clusterNodeIPs sync.Map + // clusterNodeIP6s stores the IPv6 of all other Nodes in the cluster + clusterNodeIP6s sync.Map } // NewClient returns a route client. @@ -339,6 +347,29 @@ func (c *Client) syncIPSet() error { } } + if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() { + if err := ipset.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false); err != nil { + return err + } + if err := ipset.CreateIPSet(clusterNodeIP6Set, ipset.HashIP, true); err != nil { + return err + } + c.clusterNodeIPs.Range(func(_, v interface{}) bool { + ipsetEntry := v.(string) + if err := ipset.AddEntry(clusterNodeIPSet, ipsetEntry); err != nil { + return false + } + return true + }) + c.clusterNodeIP6s.Range(func(_, v interface{}) bool { + ipSetEntry := v.(string) + if err := ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + return false + } + return true + }) + } + return nil } @@ -486,6 +517,7 @@ func (c *Client) syncIPTables() error { antreaPodIPSet, localAntreaFlexibleIPAMPodIPSet, antreaNodePortIPSet, + clusterNodeIPSet, config.VirtualNodePortDNATIPv4, config.VirtualServiceIPv4, snatMarkToIPv4, @@ -503,6 +535,7 @@ func (c *Client) syncIPTables() error { antreaPodIP6Set, localAntreaFlexibleIPAMPodIP6Set, antreaNodePortIP6Set, + clusterNodeIP6Set, config.VirtualNodePortDNATIPv6, config.VirtualServiceIPv6, snatMarkToIPv6, @@ -519,7 +552,8 @@ func (c *Client) syncIPTables() error { func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet, localAntreaFlexibleIPAMPodIPSet, - nodePortIPSet string, + nodePortIPSet, + clusterNodeIPSet string, nodePortDNATVirtualIP, serviceVirtualIP net.IP, snatMarkToIP map[uint32]net.IP, @@ -559,6 +593,19 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, "-j", iptables.NoTrackTarget, }...) } + + if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() { + // Drop the multicast packets forwarded from other Nodes in the cluster. This is because + // the packet sent out from the sender Pod is already received via tunnel port with encap mode, + // and the one forwarded via the underlay network is to send to external receivers + writeLine(iptablesData, []string{ + "-A", antreaPreRoutingChain, + "-m", "comment", "--comment", `"Antrea: drop Pod multicast traffic forwarded via underlay network"`, + "-m", "set", "--match-set", clusterNodeIPSet, "src", + "-d", types.McastCIDR.String(), + "-j", iptables.DROPTarget, + }...) + } } writeLine(iptablesData, "COMMIT") @@ -1025,6 +1072,10 @@ func (c *Client) AddRoutes(podCIDR *net.IPNet, nodeName string, nodeIP, nodeGwIP c.nodeNeighbors.Store(podCIDRStr, neigh) } + if err := c.addNodeIP(podCIDR, nodeIP); err != nil { + return err + } + c.nodeRoutes.Store(podCIDRStr, routes) return nil } @@ -1048,6 +1099,9 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error { return err } } + if err := c.deleteNodeIP(podCIDR); err != nil { + return err + } } if podCIDR.IP.To4() == nil { neigh, exists := c.nodeNeighbors.Load(podCIDRStr) @@ -1478,6 +1532,62 @@ func (c *Client) DeleteLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) err return nil } +// addNodeIP adds nodeIP into the ipset when a new Node joins the cluster. +// The ipset is consumed with encap mode when multicast is enabled. +func (c *Client) addNodeIP(podCIDR *net.IPNet, nodeIP net.IP) error { + if !c.multicastEnabled || !c.networkConfig.TrafficEncapMode.SupportsEncap() { + return nil + } + if nodeIP == nil { + return nil + } + ipSetEntry := nodeIP.String() + if nodeIP.To4() != nil { + if err := ipset.AddEntry(clusterNodeIPSet, ipSetEntry); err != nil { + return err + } + c.clusterNodeIPs.Store(podCIDR.String(), ipSetEntry) + } else { + if err := ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + return err + } + c.clusterNodeIP6s.Store(podCIDR.String(), ipSetEntry) + } + return nil +} + +// deleteNodeIP deletes NodeIPs from the ipset when a Node leaves the cluster. +// The ipset is consumed with encap mode when multicast is enabled. +func (c *Client) deleteNodeIP(podCIDR *net.IPNet) error { + if !c.multicastEnabled || !c.networkConfig.TrafficEncapMode.SupportsEncap() { + return nil + } + + podCIDRStr := podCIDR.String() + if podCIDR.IP.To4() != nil { + obj, exists := c.clusterNodeIPs.Load(podCIDRStr) + if !exists { + return nil + } + ipSetEntry := obj.(string) + if err := ipset.DelEntry(clusterNodeIPSet, ipSetEntry); err != nil { + return err + } + c.clusterNodeIPs.Delete(podCIDRStr) + } else { + obj, exists := c.clusterNodeIP6s.Load(podCIDRStr) + if !exists { + return nil + } + ipSetEntry := obj.(string) + if err := ipset.DelEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + return err + } + c.clusterNodeIP6s.Delete(podCIDRStr) + } + return nil +} + func getTransProtocolStr(protocol binding.Protocol) string { if protocol == binding.ProtocolTCP || protocol == binding.ProtocolTCPv6 { return "tcp" diff --git a/pkg/agent/types/multicast.go b/pkg/agent/types/multicast.go index da06447af3b..ae910dea965 100644 --- a/pkg/agent/types/multicast.go +++ b/pkg/agent/types/multicast.go @@ -32,6 +32,7 @@ type IGMPNPRuleInfo struct { var ( McastAllHosts = net.ParseIP("224.0.0.1").To4() + IGMPv3Router = net.ParseIP("224.0.0.22").To4() _, McastCIDR, _ = net.ParseCIDR("224.0.0.0/4") ) diff --git a/pkg/agent/util/iptables/iptables.go b/pkg/agent/util/iptables/iptables.go index 113bc028927..780c8b5fa4c 100644 --- a/pkg/agent/util/iptables/iptables.go +++ b/pkg/agent/util/iptables/iptables.go @@ -36,6 +36,7 @@ const ( RawTable = "raw" AcceptTarget = "ACCEPT" + DROPTarget = "DROP" MasqueradeTarget = "MASQUERADE" MarkTarget = "MARK" ReturnTarget = "RETURN" diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 74e362e69b0..e55ceb2df47 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -342,6 +342,7 @@ type BucketBuilder interface { LoadRegRange(regID int, data uint32, rng *Range) BucketBuilder LoadToRegField(field *RegField, data uint32) BucketBuilder ResubmitToTable(tableID uint8) BucketBuilder + SetTunnelDst(addr net.IP) BucketBuilder Done() Group } diff --git a/pkg/ovs/openflow/ofctrl_group.go b/pkg/ovs/openflow/ofctrl_group.go index ee37a35adc5..b16495b0c4a 100644 --- a/pkg/ovs/openflow/ofctrl_group.go +++ b/pkg/ovs/openflow/ofctrl_group.go @@ -16,6 +16,7 @@ package openflow import ( "fmt" + "net" "antrea.io/libOpenflow/openflow13" "antrea.io/ofnet/ofctrl" @@ -128,6 +129,13 @@ func (b *bucketBuilder) ResubmitToTable(tableID uint8) BucketBuilder { return b } +// SetTunnelDst is an action to set tunnel destination address when the bucket is selected. +func (b *bucketBuilder) SetTunnelDst(addr net.IP) BucketBuilder { + setTunDstAct := &ofctrl.SetTunnelDstAction{IP: addr} + b.bucket.AddAction(setTunDstAct.GetActionMessage()) + return b +} + // Weight sets the weight of a bucket. func (b *bucketBuilder) Weight(val uint16) BucketBuilder { b.bucket.Weight = val diff --git a/pkg/util/k8s/node.go b/pkg/util/k8s/node.go index dd288e98fc2..5318406da56 100644 --- a/pkg/util/k8s/node.go +++ b/pkg/util/k8s/node.go @@ -145,3 +145,19 @@ func GetNodeAllAddrs(node *v1.Node) (ips sets.String, err error) { return } + +func GetNodeTransportAddrs(node *v1.Node) (*ip.DualStackIPs, error) { + transportAddrs, err := GetNodeAddrsFromAnnotations(node, types.NodeTransportAddressAnnotationKey) + if err != nil { + return nil, err + } + if transportAddrs != nil { + return transportAddrs, nil + } + // Use NodeIP if the transport IP address is not set or not found. + nodeAddrs, err := GetNodeAddrs(node) + if err != nil { + return nil, err + } + return nodeAddrs, nil +} diff --git a/test/e2e/multicast_test.go b/test/e2e/multicast_test.go index 136e9e43d6f..d161ab09169 100644 --- a/test/e2e/multicast_test.go +++ b/test/e2e/multicast_test.go @@ -30,8 +30,10 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/multicast" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + agentconfig "antrea.io/antrea/pkg/config/agent" "antrea.io/antrea/pkg/features" ) @@ -56,6 +58,22 @@ func TestMulticast(t *testing.T) { if err != nil { t.Fatalf("Error computing multicast interfaces: %v", err) } + t.Run("testMulticastWithNoEncap", func(t *testing.T) { + skipIfEncapModeIsNot(t, data, config.TrafficEncapModeNoEncap) + runMulticastTestCases(t, data, nodeMulticastInterfaces, true) + }) + t.Run("testMulticastWithEncap", func(t *testing.T) { + ac := func(config *agentconfig.AgentConfig) { + config.TrafficEncapMode = "encap" + } + if err := data.mutateAntreaConfigMap(nil, ac, true, true); err != nil { + t.Fatalf("Failed to deploy cluster with encap mode: %v", err) + } + runMulticastTestCases(t, data, nodeMulticastInterfaces, false) + }) +} + +func runMulticastTestCases(t *testing.T, data *TestData, nodeMulticastInterfaces map[int][]string, checkReceiverRoute bool) { t.Run("testMulticastBetweenPodsInTwoNodes", func(t *testing.T) { skipIfNumNodesLessThan(t, 2) testcases := []multicastTestcase{ @@ -92,7 +110,7 @@ func TestMulticast(t *testing.T) { mc := mc t.Run(mc.name, func(t *testing.T) { t.Parallel() - runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, checkReceiverRoute) }) } }) @@ -132,11 +150,14 @@ func TestMulticast(t *testing.T) { mc := mc t.Run(mc.name, func(t *testing.T) { t.Parallel() - runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, checkReceiverRoute) }) } }) t.Run("testMulticastForwardToMultipleInterfaces", func(t *testing.T) { + // Skip this case with encap mode because iptables masquerade is configured, and it leads the multicast packet + // sent from Pod are not able to forwarded to more than one network interface on the host. + skipIfEncapModeIsNot(t, data, config.TrafficEncapModeNoEncap) multipleInterfacesFound := false var nodeIdx int for i, ifaces := range nodeMulticastInterfaces { @@ -365,6 +386,7 @@ func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc mul if err != nil { t.Fatalf("Error when waiting for ANP %s to be realized: %v", np.Name, err) } + defer data.DeleteANP(data.testNamespace, anp.name) } for _, anp := range mc.igmpANPConfigs { @@ -403,6 +425,7 @@ func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc mul if err != nil { t.Fatalf("Error when waiting for ANP %s released: %v", np.Name, err) } + defer data.DeleteANP(data.testNamespace, anp.name) } for _, receiverConfig := range mc.receiverConfigs { @@ -556,7 +579,12 @@ func testMulticastForwardToMultipleInterfaces(t *testing.T, data *TestData, send } } -func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string) { +func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string, checkReceiverRoute bool) { + currentEncapMode, _ := data.GetEncapMode() + if requiresExternalHostSupport(mc) && currentEncapMode == config.TrafficEncapModeEncap { + t.Logf("Multicast does not support using hostNetwork Pod to simulate the external host with encap mode, skip the case") + return + } mcjoinWaitTimeout := defaultTimeout / time.Second gatewayInterface, err := data.GetGatewayInterfaceName(antreaNamespace) failOnError(err, t) @@ -602,19 +630,21 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc continue } for _, receiverMulticastInterface := range nodeMulticastInterfaces[receiver.nodeIdx] { - _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) - if err != nil { - return false, err - } - // If multicast traffic is sent from non-HostNetwork pods and senders-receivers are located in different nodes, - // the receivers should configure corresponding inbound multicast routes. - if mc.senderConfig.nodeIdx != receiver.nodeIdx && !receiver.isHostNetwork { - if len(mRouteResult) == 0 { - return false, nil + if checkReceiverRoute { + _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) + if err != nil { + return false, err } - } else { - if len(mRouteResult) != 0 { - return false, nil + // If multicast traffic is sent from non-HostNetwork pods and senders-receivers are located in different nodes, + // the receivers should configure corresponding inbound multicast routes. + if mc.senderConfig.nodeIdx != receiver.nodeIdx && !receiver.isHostNetwork { + if len(mRouteResult) == 0 { + return false, nil + } + } else { + if len(mRouteResult) != 0 { + return false, nil + } } } _, mAddrResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip maddr show %s | grep %s", receiverMulticastInterface, mc.group.String())) @@ -707,3 +737,15 @@ func checkAntctlResult(t *testing.T, data *TestData, antreaPodName, containerPod match, _ := regexp.MatchString(fmt.Sprintf("%s[[:space:]]+%s[[:space:]]+%d[[:space:]]+%d", data.testNamespace, containerPodName, inbound, outbound), strings.TrimSpace(stdout)) return match, nil } + +func requiresExternalHostSupport(mc multicastTestcase) bool { + if mc.senderConfig.isHostNetwork { + return true + } + for _, receiver := range mc.receiverConfigs { + if receiver.isHostNetwork { + return true + } + } + return false +}