From 70ade2bbb8897c9e7632b87f8330a4417420e963 Mon Sep 17 00:00:00 2001 From: wenyingd Date: Mon, 13 Jun 2022 16:42:42 +0800 Subject: [PATCH] [Multicast] support encap mode 1, Use a single routine to send local multicast groups in an IGMP v3 Report message to notify all the other Nodes in the cluster 2. Multicast controller maintains both local Pod members and remote Nodes which has Pod members for each multicast group found in the cluster 3. Add remote Node members in the OpenFlow group buckets. 4. Agent drops the duplicated multicast packet received from underlay by - adding iptables rules in raw table antreaPreRouting chain to drop multicast packets sent from other Nodes, because the Pod multicast traffic is received from tunnel with encap mode. - adding an ipset to maintain IPs of other Nodes in the cluster, which is used as source in the iptables rule. Signed-off-by: wenyingd --- build/charts/antrea/conf/antrea-agent.conf | 2 +- .../charts/antrea/conf/antrea-controller.conf | 2 +- build/yamls/antrea-aks.yml | 8 +- build/yamls/antrea-eks.yml | 8 +- build/yamls/antrea-gke.yml | 8 +- build/yamls/antrea-ipsec.yml | 8 +- build/yamls/antrea.yml | 8 +- cmd/antrea-agent/agent.go | 4 +- .../noderoute/node_route_controller.go | 23 +- pkg/agent/multicast/mcast_controller.go | 435 +++++++++++++++--- pkg/agent/multicast/mcast_controller_test.go | 363 +++++++++++++-- pkg/agent/multicast/mcast_discovery.go | 81 +++- pkg/agent/multicast/mcast_discovery_test.go | 185 ++++++++ pkg/agent/multicast/mcast_route.go | 10 +- pkg/agent/multicast/mcast_route_test.go | 9 +- pkg/agent/openflow/client.go | 50 +- pkg/agent/openflow/multicast.go | 91 +++- pkg/agent/openflow/pipeline.go | 32 +- pkg/agent/openflow/testing/mock_openflow.go | 36 +- pkg/agent/route/route_linux.go | 112 ++++- pkg/agent/types/multicast.go | 1 + pkg/agent/util/iptables/iptables.go | 1 + pkg/ovs/openflow/interfaces.go | 1 + pkg/ovs/openflow/ofctrl_group.go | 8 + pkg/util/k8s/node.go | 16 + test/e2e/multicast_test.go | 57 ++- 26 files changed, 1341 insertions(+), 218 deletions(-) create mode 100644 pkg/agent/multicast/mcast_discovery_test.go diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 66247216af2..ed3f4dbd93c 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -46,7 +46,7 @@ featureGates: # IPAM when configuring secondary network interfaces with Multus. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "AntreaIPAM" "default" false) }} -# Enable multicast traffic. This feature is supported only with noEncap mode. +# Enable multicast traffic. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "Multicast" "default" false) }} # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. diff --git a/build/charts/antrea/conf/antrea-controller.conf b/build/charts/antrea/conf/antrea-controller.conf index 440952a8757..745e8df9496 100644 --- a/build/charts/antrea/conf/antrea-controller.conf +++ b/build/charts/antrea/conf/antrea-controller.conf @@ -17,7 +17,7 @@ featureGates: # Enable collecting and exposing NetworkPolicy statistics. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "NetworkPolicyStats" "default" true) }} -# Enable multicast traffic. This feature is supported only with noEncap mode. +# Enable multicast traffic. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "Multicast" "default" false) }} # Enable controlling SNAT IPs of Pod egress traffic. diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index ffdbe7d4304..03e3874d5a4 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-agent @@ -3933,7 +3933,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 7c64611d3a5..5c9fd89a175 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-agent @@ -3935,7 +3935,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: b82a5504883f65d32538dd4c2de4e01f4ac99203ff69191463715f67878e0745 + checksum/config: beca655f34bfd122082c7efa73505680278a8aa97e74099ca6040bcc4311622f labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 3a5dbc75ac4..7822ecf0c82 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c74fa3f40177249ad901af12a4127b31b3291f9b8bf3ce6a9be1e666e29c5447 + checksum/config: 741b313c6ab0ed98e7d994985861722f503a93529f90a5141b8a6e0c124d8904 labels: app: antrea component: antrea-agent @@ -3932,7 +3932,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: c74fa3f40177249ad901af12a4127b31b3291f9b8bf3ce6a9be1e666e29c5447 + checksum/config: 741b313c6ab0ed98e7d994985861722f503a93529f90a5141b8a6e0c124d8904 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 06a718aec3c..04fd7cbf74a 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2596,7 +2596,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2885,7 +2885,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3705,7 +3705,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 1609abc57e2865390df7a7d99e4c3b342c7e097fa879fefe8e4315130eaa9019 + checksum/config: c74f29ceba3905db50cef22ee46f73e1c101c108a70e70918b17413c174081e8 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -3991,7 +3991,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 1609abc57e2865390df7a7d99e4c3b342c7e097fa879fefe8e4315130eaa9019 + checksum/config: c74f29ceba3905db50cef22ee46f73e1c101c108a70e70918b17413c174081e8 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 69e328f1dd2..ba216d1e056 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2583,7 +2583,7 @@ data: # IPAM when configuring secondary network interfaces with Multus. # AntreaIPAM: false - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. @@ -2872,7 +2872,7 @@ data: # Enable collecting and exposing NetworkPolicy statistics. # NetworkPolicyStats: true - # Enable multicast traffic. This feature is supported only with noEncap mode. + # Enable multicast traffic. # Multicast: false # Enable controlling SNAT IPs of Pod egress traffic. @@ -3692,7 +3692,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0814cc9f3baa94e76e83a108b04d05200485610c7f5950c584503af7151a9e86 + checksum/config: 056a828ba2400e94aa9c43e6e74a4b007027bf6b95a68e1e15f34cd6ffeb2baa labels: app: antrea component: antrea-agent @@ -3932,7 +3932,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0814cc9f3baa94e76e83a108b04d05200485610c7f5950c584503af7151a9e86 + checksum/config: 056a828ba2400e94aa9c43e6e74a4b007027bf6b95a68e1e15f34cd6ffeb2baa labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 901d48cae7f..78e715e6070 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -652,7 +652,9 @@ func run(o *Options) error { ovsBridgeClient, podUpdateChannel, o.igmpQueryInterval, - validator) + validator, + networkConfig.TrafficEncapMode.SupportsEncap(), + informerFactory) if err := mcastController.Initialize(); err != nil { return err } diff --git a/pkg/agent/controller/noderoute/node_route_controller.go b/pkg/agent/controller/noderoute/node_route_controller.go index 61105539235..03f98df73e4 100644 --- a/pkg/agent/controller/noderoute/node_route_controller.go +++ b/pkg/agent/controller/noderoute/node_route_controller.go @@ -501,8 +501,9 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error { if err != nil { return fmt.Errorf("error when retrieving MAC of Node %s: %v", nodeName, err) } - peerNodeIPs, err := c.getNodeTransportAddrs(node) + peerNodeIPs, err := k8s.GetNodeTransportAddrs(node) if err != nil { + klog.ErrorS(err, "Failed to retrieve Node IP addresses", "node", node.Name) return err } peerWireGuardPublicKey := node.Annotations[types.NodeWireGuardPublicAnnotationKey] @@ -799,23 +800,3 @@ func getNodeMAC(node *corev1.Node) (net.HardwareAddr, error) { } return mac, nil } - -func (c *Controller) getNodeTransportAddrs(node *corev1.Node) (*utilip.DualStackIPs, error) { - if c.networkConfig.TransportIface != "" || len(c.networkConfig.TransportIfaceCIDRs) > 0 { - transportAddrs, err := k8s.GetNodeAddrsFromAnnotations(node, types.NodeTransportAddressAnnotationKey) - if err != nil { - return nil, err - } - if transportAddrs != nil { - return transportAddrs, nil - } - klog.InfoS("Transport address is not found, using NodeIP instead", "node", node.Name) - } - // Use NodeIP if the transport IP address is not set or not found. - peerNodeIPs, err := k8s.GetNodeAddrs(node) - if err != nil { - klog.ErrorS(err, "Failed to retrieve Node IP addresses", "node", node.Name) - return nil, err - } - return peerNodeIPs, nil -} diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index 12a14bbcb16..f557041a257 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -19,9 +19,14 @@ import ( "sync" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -35,6 +40,7 @@ import ( binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/k8s" ) type eventType uint8 @@ -48,6 +54,13 @@ const ( // How long to wait before retrying the processing of a multicast group change. minRetryDelay = 5 * time.Second maxRetryDelay = 300 * time.Second + + // Interval of reprocessing every node. + nodeResyncPeriod = 60 * time.Second + + // nodeUpdateKey is a key to trigger the Node list operation and update the OpenFlow group buckets to report + // the local multicast groups to other Nodes. + nodeUpdateKey = "nodeUpdate" ) var ( @@ -61,13 +74,18 @@ type mcastGroupEvent struct { eType eventType time time.Time iface *interfacestore.InterfaceConfig + // srcNode is the Node IP where the IGMP report message is sent from. It is set only with encap mode. + srcNode net.IP } type GroupMemberStatus struct { group net.IP // localMembers is a map for the local Pod member and its last update time, key is the Pod's interface name, // and value is its last update time. - localMembers map[string]time.Time + localMembers map[string]time.Time + // remoteMembers is a set for Nodes which have joined the multicast group in the cluster. The Node's IP is + // added in the set. + remoteMembers sets.String lastIGMPReport time.Time ofGroupID binding.GroupIDType } @@ -91,11 +109,12 @@ func (c *Controller) eventHandler(stopCh <-chan struct{}) { // addGroupMemberStatus adds the new group into groupCache. func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) { status := &GroupMemberStatus{ - group: e.group, - lastIGMPReport: e.time, - localMembers: map[string]time.Time{e.iface.InterfaceName: e.time}, - ofGroupID: c.v4GroupAllocator.Allocate(), + group: e.group, + ofGroupID: c.v4GroupAllocator.Allocate(), + remoteMembers: sets.NewString(), + localMembers: make(map[string]time.Time), } + status = addGroupMember(status, e) c.groupCache.Add(status) c.queue.Add(e.group.String()) klog.InfoS("Added new multicast group to cache", "group", e.group, "interface", e.iface.InterfaceName) @@ -112,17 +131,17 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent newStatus := &GroupMemberStatus{ group: status.group, localMembers: make(map[string]time.Time), + remoteMembers: status.remoteMembers, lastIGMPReport: status.lastIGMPReport, ofGroupID: status.ofGroupID, } for m, t := range status.localMembers { newStatus.localMembers[m] = t } - _, exist := status.localMembers[e.iface.InterfaceName] + exist := memberExists(status, e) switch e.eType { case groupJoin: - newStatus.lastIGMPReport = e.time - newStatus.localMembers[e.iface.InterfaceName] = e.time + newStatus = addGroupMember(newStatus, e) c.groupCache.Update(newStatus) if !exist { klog.InfoS("Added member to multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) @@ -130,18 +149,21 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent } case groupLeave: if exist { - delete(newStatus.localMembers, e.iface.InterfaceName) + newStatus = deleteGroupMember(newStatus, e) c.groupCache.Update(newStatus) - klog.InfoS("Deleted member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) - _, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName) - // Notify worker immediately about the member leave event if the member doesn't exist on the Node, or there are - // other local members in the multicast group. - if !found || len(newStatus.localMembers) > 0 { - c.queue.Add(newStatus.group.String()) + if e.iface.Type == interfacestore.ContainerInterface { + _, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName) + // Notify worker immediately about the member leave event if the member doesn't exist on the Node, or there are + // other local members in the multicast group. + if !found || len(newStatus.localMembers) > 0 { + c.queue.Add(newStatus.group.String()) + } else { + // Check if all local members have left the multicast group. + klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + c.checkLastMember(e.group) + } } else { - // Check if all local members have left the multicast group. - klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) - c.checkLastMember(e.group) + c.queue.Add(newStatus.group.String()) } } } @@ -175,6 +197,7 @@ func (c *Controller) clearStaleGroups() { if now.Sub(lastUpdate) > c.mcastGroupTimeout { ifConfig := &interfacestore.InterfaceConfig{ InterfaceName: member, + Type: interfacestore.ContainerInterface, } event := &mcastGroupEvent{ group: status.group, @@ -200,6 +223,7 @@ func (c *Controller) removeLocalInterface(podEvent types.PodUpdate) { interfaceName := util.GenerateContainerInterfaceName(podEvent.PodName, podEvent.PodNamespace, podEvent.ContainerID) ifConfig := &interfacestore.InterfaceConfig{ InterfaceName: interfaceName, + Type: interfacestore.ContainerInterface, } groupStatuses := c.getGroupMemberStatusesByPod(interfaceName) for _, g := range groupStatuses { @@ -222,17 +246,31 @@ type Controller struct { groupEventCh chan *mcastGroupEvent groupCache cache.Indexer queue workqueue.RateLimitingInterface - // installedGroups saves the groups which are configured on both OVS and the host. + nodeInformer coreinformers.NodeInformer + nodeLister corelisters.NodeLister + nodeListerSynced cache.InformerSynced + nodeUpdateQueue workqueue.RateLimitingInterface + // installedGroups saves the groups which are configured on OVS. + // With encap mode, the entries in installedGroups include all multicast groups identified in the cluster. installedGroups sets.String installedGroupsMutex sync.RWMutex - mRouteClient *MRouteClient - ovsBridgeClient ovsconfig.OVSBridgeClient + // installedLocalGroups saves the groups which are configured on OVS and host. The entries in installedLocalGroups + // include the multicast groups that local Pod members join. + installedLocalGroups sets.String + installedLocalGroupsMutex sync.RWMutex + mRouteClient *MRouteClient + ovsBridgeClient ovsconfig.OVSBridgeClient // queryInterval is the interval to send IGMP query messages. queryInterval time.Duration // mcastGroupTimeout is the timeout to detect a group as stale if no IGMP report is received within the time. mcastGroupTimeout time.Duration // the group ID in OVS for group which IGMP queries are sent to queryGroupId binding.GroupIDType + // nodeGroupID is the OpenFlow group ID in OVS which is used to send IGMP report messages to other Nodes. + nodeGroupID binding.GroupIDType + // installedNodes is the installed Node set that the IGMP report message is sent to. + installedNodes sets.String + encapEnabled bool } func NewMulticastController(ofClient openflow.Client, @@ -244,28 +282,54 @@ func NewMulticastController(ofClient openflow.Client, ovsBridgeClient ovsconfig.OVSBridgeClient, podUpdateSubscriber channel.Subscriber, igmpQueryInterval time.Duration, - validator types.McastNetworkPolicyController) *Controller { + validator types.McastNetworkPolicyController, + isEncap bool, + informerFactory informers.SharedInformerFactory) *Controller { eventCh := make(chan *mcastGroupEvent, workerCount) - groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, validator) + groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, validator, isEncap) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) - multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces) + multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces, isEncap) c := &Controller{ - ofClient: ofClient, - ifaceStore: ifaceStore, - v4GroupAllocator: v4GroupAllocator, - nodeConfig: nodeConfig, - igmpSnooper: groupSnooper, - groupEventCh: eventCh, - groupCache: groupCache, - installedGroups: sets.NewString(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), - mRouteClient: multicastRouteClient, - ovsBridgeClient: ovsBridgeClient, - queryInterval: igmpQueryInterval, - mcastGroupTimeout: igmpQueryInterval * 3, - queryGroupId: v4GroupAllocator.Allocate(), + ofClient: ofClient, + ifaceStore: ifaceStore, + v4GroupAllocator: v4GroupAllocator, + nodeConfig: nodeConfig, + igmpSnooper: groupSnooper, + groupEventCh: eventCh, + groupCache: groupCache, + installedGroups: sets.NewString(), + installedLocalGroups: sets.NewString(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), + mRouteClient: multicastRouteClient, + ovsBridgeClient: ovsBridgeClient, + queryInterval: igmpQueryInterval, + mcastGroupTimeout: igmpQueryInterval * 3, + queryGroupId: v4GroupAllocator.Allocate(), + encapEnabled: isEncap, + } + if isEncap { + c.nodeGroupID = v4GroupAllocator.Allocate() + c.installedNodes = sets.NewString() + c.nodeInformer = informerFactory.Core().V1().Nodes() + c.nodeLister = c.nodeInformer.Lister() + c.nodeListerSynced = c.nodeInformer.Informer().HasSynced + c.nodeUpdateQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "nodeUpdate") + c.nodeInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(cur interface{}) { + c.nodeUpdateQueue.Add(nodeUpdateKey) + }, + UpdateFunc: func(old, cur interface{}) { + c.checkNodeUpdate(old, cur) + }, + DeleteFunc: func(old interface{}) { + c.nodeUpdateQueue.Add(nodeUpdateKey) + }, + }, + nodeResyncPeriod, + ) } podUpdateSubscriber.Subscribe(c.memberChanged) return c @@ -290,6 +354,16 @@ func (c *Controller) Initialize() error { if err != nil { return err } + if c.encapEnabled { + // Install OpenFlow group to send the multicast groups that local Pods joined to all other Nodes in the cluster. + if err := c.ofClient.InstallMulticastGroup(c.nodeGroupID, nil, nil); err != nil { + klog.ErrorS(err, "Failed to update OpenFlow group for remote Nodes") + } + if err := c.ofClient.InstallMulticastRemoteReportFlows(c.nodeGroupID); err != nil { + klog.ErrorS(err, "Failed to install OpenFlow group and flow to send IGMP report to other Nodes") + return err + } + } return nil } @@ -301,6 +375,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } }, c.queryInterval, stopCh) + if c.encapEnabled { + go wait.NonSlidingUntil(c.syncLocalGroupsToOtherNodes, c.queryInterval, stopCh) + go wait.Until(c.nodeWorker, time.Second, stopCh) + } + // Periodically check the group member status, and remove the groups in which no members exist go wait.NonSlidingUntil(c.clearStaleGroups, c.queryInterval, stopCh) go c.eventHandler(stopCh) @@ -380,54 +459,107 @@ func (c *Controller) syncGroup(groupKey string) error { } memberPorts = append(memberPorts, uint32(obj.OFPort)) } - if c.groupHasInstalled(groupKey) { - if c.groupIsStale(status) { - // Remove the multicast flow entry if no local Pod is in the group. - if err := c.ofClient.UninstallMulticastFlows(status.group); err != nil { - klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) + var remoteNodeReceivers []net.IP + if c.encapEnabled { + remoteNodeReceivers = make([]net.IP, 0, len(status.remoteMembers)) + for member := range status.remoteMembers { + remoteNodeReceivers = append(remoteNodeReceivers, net.ParseIP(member)) + } + } + installLocalMulticastGroup := func() error { + if err := c.mRouteClient.multicastInterfacesJoinMgroup(status.group); err != nil { + klog.ErrorS(err, "Failed to install multicast group identified with local members", "group", groupKey) + return err + } + if c.encapEnabled { + if err := c.igmpSnooper.sendIGMPJoinReport([]net.IP{status.group}); err != nil { + klog.ErrorS(err, "Failed to sync local multicast group to other Nodes", "group", groupKey) return err } - // Remove the multicast flow entry if no local Pod is in the group. - if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil { - klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey) - return err + } + c.addInstalledLocalGroup(groupKey) + klog.InfoS("New local multicast group is added", "group", groupKey) + return nil + } + deleteLocalMulticastGroup := func() error { + err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) + if err != nil { + klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) + return err + } + klog.InfoS("Removed multicast route entry", "group", status.group) + err = c.mRouteClient.multicastInterfacesLeaveMgroup(status.group) + if err != nil { + klog.ErrorS(err, "Failed to leave multicast group for multicast interfaces", "group", groupKey) + return err + } + + if c.encapEnabled { + group := net.ParseIP(groupKey) + // Send IGMP leave message to other Nodes to notify the current Node leaves the given multicast group. + if err := c.igmpSnooper.sendIGMPLeaveReport([]net.IP{group}); err != nil { + klog.ErrorS(err, "Failed to send IGMP leave message to other Nodes", "group", groupKey) } - c.v4GroupAllocator.Release(status.ofGroupID) - err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) - if err != nil { - klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) - return err + } + c.delInstalledLocalGroup(groupKey) + return nil + } + if c.groupHasInstalled(groupKey) { + if c.groupIsStale(status) { + if c.localGroupHasInstalled(groupKey) { + if err := deleteLocalMulticastGroup(); err != nil { + return err + } + } + // TODO: add check on the stale multicast group that is joined by the Pods on a different Node. + // remoteMembers is always empty with noEncap mode. + if status.remoteMembers.Len() == 0 { + // Remove the multicast OpenFlow flow and group entries if none Pod member on local or remote Node is in the group. + if err := c.ofClient.UninstallMulticastFlows(status.group); err != nil { + klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) + return err + } + // Remove the multicast flow entry if no local Pod is in the group. + if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil { + klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey) + return err + } + c.v4GroupAllocator.Release(status.ofGroupID) + c.delInstalledGroup(groupKey) + c.groupCache.Delete(status) + klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) + return nil } - err = c.mRouteClient.multicastInterfacesLeaveMgroup(status.group) - if err != nil { - klog.ErrorS(err, "Failed to leave multicast group for multicast interfaces", "group", groupKey) + } else if !c.localGroupHasInstalled(groupKey) { + // Install multicast flows and routing entries for the multicast group that local Pods join. + if err := installLocalMulticastGroup(); err != nil { return err } - c.delInstalledGroup(groupKey) - c.groupCache.Delete(status) - klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) - return nil } - // Reinstall OpenFlow group because the local Pod receivers have changed. - if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + // Reinstall OpenFlow group because either the remote node receivers or local Pod receivers have changed. + klog.V(2).InfoS("Updating OpenFlow group for receivers in multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts, remoteNodeReceivers); err != nil { return err } - klog.V(2).InfoS("Updated OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) + klog.InfoS("Updated OpenFlow group for receivers in multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) return nil } // Install OpenFlow group for a new multicast group which has local Pod receivers joined. - if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil { + if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts, remoteNodeReceivers); err != nil { return err } - klog.V(2).InfoS("Installed OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts) + klog.V(2).InfoS("Installed OpenFlow group for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) // Install OpenFlow flow to forward packets to local Pod receivers which are included in the group. if err := c.ofClient.InstallMulticastFlows(status.group, status.ofGroupID); err != nil { klog.ErrorS(err, "Failed to install multicast flows", "group", status.group) return err } - if err := c.mRouteClient.multicastInterfacesJoinMgroup(status.group); err != nil { - klog.ErrorS(err, "Failed to join multicast group for multicast interfaces", "group", status.group) - return err + klog.InfoS("Installed OpenFlow flows for multicast group", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts, "remoteReceivers", remoteNodeReceivers) + if len(status.localMembers) > 0 { + err := installLocalMulticastGroup() + if err != nil { + return err + } } c.addInstalledGroup(groupKey) return nil @@ -461,6 +593,24 @@ func (c *Controller) delInstalledGroup(groupKey string) { c.installedGroupsMutex.Unlock() } +func (c *Controller) localGroupHasInstalled(groupKey string) bool { + c.installedLocalGroupsMutex.RLock() + defer c.installedLocalGroupsMutex.RUnlock() + return c.installedLocalGroups.Has(groupKey) +} + +func (c *Controller) addInstalledLocalGroup(groupKey string) { + c.installedLocalGroupsMutex.Lock() + c.installedLocalGroups.Insert(groupKey) + c.installedLocalGroupsMutex.Unlock() +} + +func (c *Controller) delInstalledLocalGroup(groupKey string) { + c.installedLocalGroupsMutex.Lock() + c.installedLocalGroups.Delete(groupKey) + c.installedLocalGroupsMutex.Unlock() +} + func (c *Controller) addOrUpdateGroupEvent(e *mcastGroupEvent) { obj, ok, _ := c.groupCache.GetByKey(e.group.String()) switch e.eType { @@ -509,7 +659,7 @@ func (c *Controller) updateQueryGroup() error { memberPorts = append(memberPorts, uint32(iface.OFPort)) } // Install OpenFlow group for a new multicast group which has local Pod receivers joined. - if err := c.ofClient.InstallMulticastGroup(c.queryGroupId, memberPorts); err != nil { + if err := c.ofClient.InstallMulticastGroup(c.queryGroupId, memberPorts, nil); err != nil { return err } klog.V(2).InfoS("Installed OpenFlow group for local receivers", "group", types.McastAllHosts.String(), @@ -517,6 +667,64 @@ func (c *Controller) updateQueryGroup() error { return nil } +// syncLocalGroupsToOtherNodes sends IGMP join message to other Nodes in the same cluster to notify what multicast groups +// are joined by this Node. This function is used only with encap mode. +func (c *Controller) syncLocalGroupsToOtherNodes() { + if c.installedLocalGroups.Len() == 0 { + return + } + localGroups := make([]net.IP, 0, c.installedLocalGroups.Len()) + c.installedLocalGroupsMutex.RLock() + for group := range c.installedLocalGroups { + localGroups = append(localGroups, net.ParseIP(group)) + } + c.installedLocalGroupsMutex.RUnlock() + if err := c.igmpSnooper.sendIGMPJoinReport(localGroups); err != nil { + klog.ErrorS(err, "Failed to sync local multicast groups to other Nodes") + } +} + +func (c *Controller) syncNodes() error { + startTime := time.Now() + defer func() { + klog.V(4).Infof("Finished syncing Node IPs. (%v)", time.Since(startTime)) + }() + + nodes, err := c.nodeLister.List(labels.Everything()) + if err != nil { + klog.ErrorS(err, "Failed to list Nodes") + return err + } + var updatedNodeIPs []net.IP + updatedNodeIPSet := sets.NewString() + for _, n := range nodes { + if n.Name == c.nodeConfig.Name { + continue + } + nip, err := k8s.GetNodeTransportAddrs(n) + if err != nil { + klog.ErrorS(err, "Failed to retrieve Node IP addresses", "node", n.Name) + return err + } + if nip.IPv4 != nil { + updatedNodeIPs = append(updatedNodeIPs, nip.IPv4) + updatedNodeIPSet.Insert(nip.IPv4.String()) + } + } + if c.installedNodes.Equal(updatedNodeIPSet) { + klog.V(2).InfoS("Nodes in the cluster are not changed, ignore the event") + return nil + } + if err := c.ofClient.InstallMulticastGroup(c.nodeGroupID, nil, updatedNodeIPs); err != nil { + klog.ErrorS(err, "Failed to update OpenFlow group for remote Nodes") + return err + } + c.installedNodes = updatedNodeIPSet + // Notify local installed multicast groups to other Nodes in the cluster. + c.syncLocalGroupsToOtherNodes() + return nil +} + func podInterfaceIndexFunc(obj interface{}) ([]string, error) { groupState := obj.(*GroupMemberStatus) podInterfaces := make([]string, 0, len(groupState.localMembers)) @@ -596,3 +804,94 @@ func (c *Controller) GetAllPodsStats() map[*interfacestore.InterfaceConfig]*PodT } return statsMap } + +func (c *Controller) checkNodeUpdate(old interface{}, cur interface{}) { + oldNode := old.(*corev1.Node) + if oldNode.Name == c.nodeConfig.Name { + return + } + curNode := cur.(*corev1.Node) + oldIPs, err := k8s.GetNodeTransportAddrs(oldNode) + if err != nil { + klog.ErrorS(err, "Failed to retrieve Node old IP addresses", "node", oldNode.Name) + return + } + newIPs, err := k8s.GetNodeTransportAddrs(curNode) + if err != nil { + klog.ErrorS(err, "Failed to retrieve Node current IP addresses", "node", curNode.Name) + return + } + if (*newIPs).Equal(*oldIPs) { + return + } + c.nodeUpdateQueue.Add(nodeUpdateKey) +} + +func (c *Controller) nodeWorker() { + for c.processNextNodeItem() { + } +} + +func (c *Controller) processNextNodeItem() bool { + obj, quit := c.nodeUpdateQueue.Get() + if quit { + return false + } + // We call Done here so the workqueue knows we have finished processing this item. We also + // must remember to call Forget if we do not want this work item being re-queued. For + // example, we do not call Forget if a transient error occurs, instead the item is put back + // on the workqueue and attempted again after a back-off period. + defer c.nodeUpdateQueue.Done(obj) + + // We expect strings (Node name) to come off the workqueue. + if key, ok := obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call Forget here else we'd + // go into a loop of attempting to process a work item that is invalid. + // This should not happen: only a constant string enqueues nodeUpdateQueue. + c.nodeUpdateQueue.Forget(obj) + klog.Errorf("Expected string in work queue but got %#v", obj) + return true + } else if err := c.syncNodes(); err == nil { + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.nodeUpdateQueue.Forget(key) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.nodeUpdateQueue.AddRateLimited(key) + klog.ErrorS(err, "Error syncing Nodes, requeuing") + } + return true +} + +func memberExists(status *GroupMemberStatus, e *mcastGroupEvent) bool { + var exist bool + if e.iface.Type == interfacestore.ContainerInterface { + _, exist = status.localMembers[e.iface.InterfaceName] + } else if e.iface.Type == interfacestore.TunnelInterface { + exist = status.remoteMembers.Has(e.srcNode.String()) + } + return exist +} + +func addGroupMember(status *GroupMemberStatus, e *mcastGroupEvent) *GroupMemberStatus { + if e.iface.Type == interfacestore.ContainerInterface { + status.localMembers[e.iface.InterfaceName] = e.time + status.lastIGMPReport = e.time + klog.V(2).InfoS("Added local member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + } else { + status.remoteMembers.Insert(e.srcNode.String()) + klog.V(2).InfoS("Added remote member from multicast group", "group", e.group.String(), "member", e.srcNode) + } + return status +} + +func deleteGroupMember(status *GroupMemberStatus, e *mcastGroupEvent) *GroupMemberStatus { + if e.iface.Type == interfacestore.ContainerInterface { + delete(status.localMembers, e.iface.InterfaceName) + klog.V(2).InfoS("Deleted local member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName) + } else { + status.remoteMembers.Delete(e.srcNode.String()) + klog.V(2).InfoS("Deleted remote member from multicast group", "group", e.group.String(), "member", e.srcNode) + } + return status +} diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index a0cfd86c814..af1ec593150 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -1,6 +1,3 @@ -//go:build linux -// +build linux - // Copyright 2021 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,19 +15,26 @@ package multicast import ( + "context" + "fmt" "net" + "os" "sync" "testing" "time" - "antrea.io/libOpenflow/openflow13" "antrea.io/libOpenflow/protocol" "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" "antrea.io/antrea/pkg/agent/config" @@ -53,6 +57,8 @@ var ( mockIfaceStore *ifaceStoretest.MockInterfaceStore mockMulticastValidator *typestest.MockMcastNetworkPolicyController ovsClient *ovsconfigtest.MockOVSBridgeClient + clientset *fake.Clientset + informerFactory informers.SharedInformerFactory if1 = &interfacestore.InterfaceConfig{ Type: interfacestore.ContainerInterface, InterfaceName: "if1", @@ -69,10 +75,7 @@ var ( OFPort: 2, }, } - nodeIf1IP = net.ParseIP("192.168.20.22") - externalInterfaceIP = net.ParseIP("192.168.50.23") - pktInSrcMAC, _ = net.ParseMAC("11:22:33:44:55:66") - pktInDstMAC, _ = net.ParseMAC("01:00:5e:00:00:16") + nodeIf1IP = net.ParseIP("192.168.20.22") ) func TestAddGroupMemberStatus(t *testing.T) { @@ -83,7 +86,7 @@ func TestAddGroupMemberStatus(t *testing.T) { time: time.Now(), iface: if1, } - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) err := mctrl.initialize(t) mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, @@ -97,7 +100,7 @@ func TestAddGroupMemberStatus(t *testing.T) { assert.True(t, ok) assert.Equal(t, mgroup.String(), key) mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) mockOFClient.EXPECT().InstallMulticastFlows(mgroup, gomock.Any()).Times(1) mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1) err = mctrl.syncGroup(key) @@ -106,10 +109,9 @@ func TestAddGroupMemberStatus(t *testing.T) { } func TestUpdateGroupMemberStatus(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) err := mctrl.initialize(t) assert.Nil(t, err) - igmpMaxResponseTime = time.Second * 1 mgroup := net.ParseIP("224.96.1.4") event := &mcastGroupEvent{ group: mgroup, @@ -143,9 +145,8 @@ func TestUpdateGroupMemberStatus(t *testing.T) { } func TestCheckLastMember(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) workerCount = 1 - igmpMaxResponseTime = time.Second * 1 lastProbe := time.Now() mgroup := net.ParseIP("224.96.1.2") testCheckLastMember := func(ev *mcastGroupEvent, expExist bool) { @@ -195,7 +196,7 @@ func TestCheckLastMember(t *testing.T) { mctrl.queue.Forget(obj) } mockIfaceStore.EXPECT().GetInterfaceByName(if1.InterfaceName).Return(if1, true).Times(1) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) for _, tc := range []struct { ev *mcastGroupEvent exists bool @@ -209,7 +210,7 @@ func TestCheckLastMember(t *testing.T) { } func TestClearStaleGroups(t *testing.T) { - mctrl := newMockMulticastController(t) + mctrl := newMockMulticastController(t, false) workerCount = 1 err := mctrl.initialize(t) assert.Nil(t, err) @@ -253,12 +254,14 @@ func TestClearStaleGroups(t *testing.T) { err := mctrl.groupCache.Add(g) assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) + mctrl.addInstalledLocalGroup(g.group.String()) } fakePort := int32(1) for _, g := range staleGroups { err := mctrl.groupCache.Add(g) assert.Nil(t, err) mctrl.addInstalledGroup(g.group.String()) + mctrl.addInstalledLocalGroup(g.group.String()) for m := range g.localMembers { mockIface := &interfacestore.InterfaceConfig{InterfaceName: m, OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: fakePort}} mockIfaceStore.EXPECT().GetInterfaceByName(m).Return(mockIface, true) @@ -283,7 +286,7 @@ func TestClearStaleGroups(t *testing.T) { } func TestProcessPacketIn(t *testing.T) { - mockController := newMockMulticastController(t) + mockController := newMockMulticastController(t, false) snooper := mockController.igmpSnooper stopCh := make(chan struct{}) defer close(stopCh) @@ -449,6 +452,291 @@ func TestProcessPacketIn(t *testing.T) { } } +func TestEncapModeInitialize(t *testing.T) { + mockController := newMockMulticastController(t, true) + assert.True(t, mockController.nodeGroupID != 0) + err := mockController.initialize(t) + assert.Nil(t, err) +} + +func TestEncapLocalReportAndNotifyRemote(t *testing.T) { + mockController := newMockMulticastController(t, true) + _ = mockController.initialize(t) + mockController.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ + {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, + } + stopCh := make(chan struct{}) + defer close(stopCh) + + go wait.Until(mockController.worker, time.Second, stopCh) + + iface1 := createInterface("pod1", 3) + iface2 := createInterface("pod2", 4) + mgroup := net.ParseIP("224.2.100.4") + for _, tc := range []struct { + e *mcastGroupEvent + interfaces []*interfacestore.InterfaceConfig + groupChanged bool + ifaceCheck bool + }{ + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: true, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: false, ifaceCheck: false}, + {e: &mcastGroupEvent{group: mgroup, eType: groupJoin, time: time.Now(), iface: iface2}, interfaces: []*interfacestore.InterfaceConfig{iface1, iface2}, groupChanged: false, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupLeave, time: time.Now(), iface: iface2}, interfaces: []*interfacestore.InterfaceConfig{iface1, iface2}, groupChanged: false, ifaceCheck: true}, + {e: &mcastGroupEvent{group: mgroup, eType: groupLeave, time: time.Now(), iface: iface1}, interfaces: []*interfacestore.InterfaceConfig{iface1}, groupChanged: true, ifaceCheck: true}, + } { + groupKey := tc.e.group.String() + if tc.e.eType == groupJoin { + if tc.groupChanged { + mockMulticastSocket.EXPECT().MulticastInterfaceJoinMgroup(mgroup.To4(), nodeIf1IP.To4(), if1.InterfaceName).Times(1) + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any()) + mockOFClient.EXPECT().InstallMulticastFlows(mgroup, gomock.Any()).Times(1) + } + if tc.ifaceCheck { + for _, iface := range tc.interfaces { + mockIfaceStore.EXPECT().GetInterfaceByName(iface.InterfaceName).Return(iface, true) + } + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) + } + } else { + if tc.ifaceCheck { + for _, iface := range tc.interfaces { + mockIfaceStore.EXPECT().GetInterfaceByName(iface.InterfaceName).Return(iface, true) + } + if len(tc.interfaces) == 1 { + mockOFClient.EXPECT().SendIGMPQueryPacketOut(igmpQueryDstMac, types.McastAllHosts, gomock.Any(), gomock.Any()).AnyTimes() + } + if !tc.groupChanged { + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any(), gomock.Any()) + } + } + if tc.groupChanged { + mockOFClient.EXPECT().UninstallGroup(gomock.Any()) + mockOFClient.EXPECT().UninstallMulticastFlows(tc.e.group) + mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any()) + } + } + mockController.addOrUpdateGroupEvent(tc.e) + + if tc.groupChanged { + err := wait.PollImmediate(time.Millisecond*100, time.Second*3, func() (done bool, err error) { + if tc.e.eType == groupJoin { + return mockController.localGroupHasInstalled(groupKey) && mockController.groupHasInstalled(groupKey), nil + } else { + return !mockController.localGroupHasInstalled(groupKey) && !mockController.groupHasInstalled(groupKey), nil + } + }) + assert.Nil(t, err) + } else { + time.Sleep(time.Millisecond * 200) + } + } +} + +func TestNodeUpdate(t *testing.T) { + mockController := newMockMulticastController(t, true) + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + informerFactory.WaitForCacheSync(stopCh) + mockController.addInstalledLocalGroup("224.2.100.1") + + wg := sync.WaitGroup{} + for _, tc := range []struct { + addedNodes map[string]map[string]string + deletedNodes []string + expectedNodes sets.String + }{ + { + addedNodes: map[string]map[string]string{ + "n1": {"ip": "10.10.10.11"}, + "n2": {"ip": "10.10.10.12"}, + }, + expectedNodes: sets.NewString("10.10.10.11", "10.10.10.12"), + }, + { + addedNodes: map[string]map[string]string{ + "n3": {"ip": "10.10.10.13"}, + }, + expectedNodes: sets.NewString("10.10.10.11", "10.10.10.12", "10.10.10.13"), + }, + { + deletedNodes: []string{ + "n1", + }, + expectedNodes: sets.NewString("10.10.10.12", "10.10.10.13"), + }, + { + addedNodes: map[string]map[string]string{ + "n4": {"ip": "10.10.10.14", "label": "10.10.10.24"}, + }, + deletedNodes: []string{ + "n2", + }, + expectedNodes: sets.NewString("10.10.10.13", "10.10.10.24"), + }, + } { + times := len(tc.addedNodes) + len(tc.deletedNodes) + mockOFClient.EXPECT().InstallMulticastGroup(mockController.nodeGroupID, nil, gomock.Any()).Return(nil).Times(times) + mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any()).Times(times) + wg.Add(1) + + go func() { + for name, cfg := range tc.addedNodes { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + ip, exist := cfg["ip"] + if exist { + node.Status = corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + { + Type: corev1.NodeInternalIP, + Address: ip, + }, + }, + } + } + label, exist := cfg["label"] + if exist { + node.Annotations = map[string]string{ + types.NodeTransportAddressAnnotationKey: label, + } + } + clientset.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) + mockController.processNextNodeItem() + } + for _, name := range tc.deletedNodes { + clientset.CoreV1().Nodes().Delete(context.TODO(), name, metav1.DeleteOptions{}) + mockController.processNextNodeItem() + } + wg.Done() + }() + + wg.Wait() + assert.Equal(t, tc.expectedNodes, mockController.installedNodes, fmt.Sprintf("installedNodes: %v, expectedNodes: %v", mockController.installedNodes, tc.expectedNodes)) + } +} + +func TestRemoteMemberJoinLeave(t *testing.T) { + mockController := newMockMulticastController(t, true) + _ = mockController.initialize(t) + stopCh := make(chan struct{}) + defer close(stopCh) + + stopStr := "done" + eventHandler := func(stopCh <-chan struct{}) { + for { + select { + case e := <-mockController.groupEventCh: + if e.group.Equal(net.IPv4zero) { + mockController.queue.Add(stopStr) + } else { + mockController.addOrUpdateGroupEvent(e) + } + case <-stopCh: + return + } + } + } + go eventHandler(stopCh) + + for _, tc := range []struct { + groupStrs []string + nodeStr string + isJoin bool + }{ + {groupStrs: []string{"224.2.100.2", "224.2.100.3"}, nodeStr: "10.10.10.11", isJoin: true}, + {groupStrs: []string{"224.2.100.3"}, nodeStr: "10.10.10.11", isJoin: true}, + {groupStrs: []string{"224.2.100.2", "224.2.100.5"}, nodeStr: "10.10.10.12", isJoin: true}, + {groupStrs: []string{"224.2.100.2"}, nodeStr: "10.10.10.12", isJoin: false}, + } { + groups := make([]net.IP, len(tc.groupStrs)) + for i, g := range tc.groupStrs { + groups[i] = net.ParseIP(g) + } + node := net.ParseIP(tc.nodeStr) + testRemoteReport(t, mockController, groups, node, tc.isJoin, stopStr) + } +} + +func testRemoteReport(t *testing.T, mockController *Controller, groups []net.IP, node net.IP, nodeJoin bool, stopStr string) { + tunnelPort := uint32(2) + proto := uint8(protocol.IGMPIsEx) + if !nodeJoin { + proto = uint8(protocol.IGMPToIn) + } + for _, g := range groups { + var exists bool + obj, exists, _ := mockController.groupCache.GetByKey(g.String()) + if !exists { + mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any()) + } else { + status := obj.(*GroupMemberStatus) + exists = status.remoteMembers.Has(node.String()) + if nodeJoin && exists || !nodeJoin && !exists { + continue + } + } + mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), []uint32{config.HostGatewayOFPort}, gomock.Any()) + } + + processNextItem := func(stopStr string) { + for { + obj, quit := mockController.queue.Get() + if quit { + return + } + key := obj.(string) + if key == stopStr { + mockController.queue.Forget(key) + mockController.queue.Done(obj) + return + } + if err := mockController.syncGroup(key); err != nil { + t.Errorf("Failed to process %s: %v", key, err) + } + mockController.queue.Forget(key) + mockController.queue.Done(obj) + } + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + processNextItem(stopStr) + wg.Done() + }() + + err := processRemoteReport(t, mockController, groups, node, proto, tunnelPort) + assert.Nil(t, err) + mockController.groupEventCh <- &mcastGroupEvent{group: net.IPv4zero} + wg.Wait() + + for _, g := range groups { + obj, exists, _ := mockController.groupCache.GetByKey(g.String()) + assert.True(t, exists) + status := obj.(*GroupMemberStatus) + if nodeJoin { + assert.True(t, status.remoteMembers.Has(node.String())) + } else { + assert.False(t, status.remoteMembers.Has(node.String())) + } + } + for _, g := range groups { + assert.True(t, mockController.groupHasInstalled(g.String())) + } +} + +func processRemoteReport(t *testing.T, mockController *Controller, groups []net.IP, remoteNode net.IP, reportType uint8, tunnelPort uint32) error { + pkt := generatePacketInForRemoteReport(t, mockController.igmpSnooper, groups, remoteNode, reportType, tunnelPort) + mockIfaceStore.EXPECT().GetInterfaceByOFPort(tunnelPort).Return(createTunnelInterface(tunnelPort, nodeIf1IP), true) + return mockController.igmpSnooper.processPacketIn(&pkt) +} + func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEvent) { obj, exits, err := cache.GetByKey(event.group.String()) assert.Nil(t, err) @@ -467,7 +755,7 @@ func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEven } } -func newMockMulticastController(t *testing.T) *Controller { +func newMockMulticastController(t *testing.T, isEncap bool) *Controller { controller := gomock.NewController(t) mockOFClient = openflowtest.NewMockClient(controller) mockIfaceStore = ifaceStoretest.NewMockInterfaceStore(controller) @@ -479,17 +767,24 @@ func newMockMulticastController(t *testing.T) *Controller { mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) groupAllocator := openflow.NewGroupAllocator(false) podUpdateSubscriber := channel.NewSubscribableChannel("PodUpdate", 100) - mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber, time.Second*5, mockMulticastValidator) + + clientset = fake.NewSimpleClientset() + informerFactory = informers.NewSharedInformerFactory(clientset, 12*time.Hour) + mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.NewString(), ovsClient, podUpdateSubscriber, time.Second*5, mockMulticastValidator, isEncap, informerFactory) return mctrl } func (c *Controller) initialize(t *testing.T) error { mockOFClient.EXPECT().InstallMulticastInitialFlows(uint8(0)).Times(1) - mockOFClient.EXPECT().InstallMulticastGroup(gomock.Any(), gomock.Any()) + mockOFClient.EXPECT().InstallMulticastGroup(c.queryGroupId, gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any()) mockIfaceStore.EXPECT().GetInterfacesByType(interfacestore.InterfaceType(0)).Times(1).Return([]*interfacestore.InterfaceConfig{}) mockMulticastSocket.EXPECT().AllocateVIFs(gomock.Any(), uint16(0)).Times(1).Return([]uint16{0}, nil) mockMulticastSocket.EXPECT().AllocateVIFs(gomock.Any(), uint16(1)).Times(1).Return([]uint16{1, 2}, nil) + if c.encapEnabled { + mockOFClient.EXPECT().InstallMulticastGroup(c.nodeGroupID, gomock.Any(), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallMulticastRemoteReportFlows(c.nodeGroupID).Times(1) + } return c.Initialize() } @@ -509,32 +804,13 @@ func createInterface(name string, ofport uint32) *interfacestore.InterfaceConfig func createIGMPReportPacketIn(joinedGroups []net.IP, leftGroups []net.IP, version uint8, ofport uint32) []*ofctrl.PacketIn { joinMessages := createIGMPJoinMessage(joinedGroups, version) leaveMessages := createIGMPLeaveMessage(leftGroups, version) - generatePacket := func(m util.Message) ofctrl.PacketIn { - pkt := openflow13.NewPacketIn() - matchInport := openflow13.NewInPortField(ofport) - pkt.Match.AddField(*matchInport) - ipPacket := &protocol.IPv4{ - Version: 0x4, - IHL: 5, - Protocol: IGMPProtocolNumber, - Length: 20 + m.Len(), - Data: m, - } - pkt.Data = protocol.Ethernet{ - HWDst: pktInDstMAC, - HWSrc: pktInSrcMAC, - Ethertype: protocol.IPv4_MSG, - Data: ipPacket, - } - return ofctrl.PacketIn(*pkt) - } pkts := make([]*ofctrl.PacketIn, 0) for _, m := range joinMessages { - pkt := generatePacket(m) + pkt := generatePacket(m, ofport, nil) pkts = append(pkts, &pkt) } for _, m := range leaveMessages { - pkt := generatePacket(m) + pkt := generatePacket(m, ofport, nil) pkts = append(pkts, &pkt) } return pkts @@ -578,3 +854,8 @@ func createIGMPJoinMessage(groups []net.IP, version uint8) []util.Message { } return pkts } + +func TestMain(m *testing.M) { + igmpMaxResponseTime = time.Second + os.Exit(m.Run()) +} diff --git a/pkg/agent/multicast/mcast_discovery.go b/pkg/agent/multicast/mcast_discovery.go index d6ffcef5a75..648163dd367 100644 --- a/pkg/agent/multicast/mcast_discovery.go +++ b/pkg/agent/multicast/mcast_discovery.go @@ -39,6 +39,11 @@ const ( IGMPProtocolNumber = 2 ) +const ( + openflowKeyTunnelSrc = "NXM_NX_TUN_IPV4_SRC" + openflowKeyInPort = "OXM_OF_IN_PORT" +) + var ( // igmpMaxResponseTime is the maximum time allowed before sending a responding report which is used for the // "Max Resp Code" field in the IGMP query message. It is also the maximum time to wait for the IGMP report message @@ -46,6 +51,8 @@ var ( igmpMaxResponseTime = time.Second * 10 // igmpQueryDstMac is the MAC address used in the dst MAC field in the IGMP query message igmpQueryDstMac, _ = net.ParseMAC("01:00:5e:00:00:01") + // igmpReportDstMac is the MAC address used in the dst MAC field in the IGMP report message + igmpReportDstMac, _ = net.ParseMAC("01:00:5e:00:00:16") ) type IGMPSnooper struct { @@ -62,6 +69,7 @@ type IGMPSnooper struct { // Similar to igmpReportANPStats, it stores ACNP stats for IGMP reports. igmpReportACNPStats map[apitypes.UID]map[string]*types.RuleMetric igmpReportACNPStatsMutex sync.Mutex + encapEnabled bool } func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error { @@ -92,7 +100,7 @@ func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow13.NXRange) (uint32, func (s *IGMPSnooper) parseSrcInterface(pktIn *ofctrl.PacketIn) (*interfacestore.InterfaceConfig, error) { matches := pktIn.GetMatches() - ofPortField := matches.GetMatchByName("OXM_OF_IN_PORT") + ofPortField := matches.GetMatchByName(openflowKeyInPort) if ofPortField == nil { return nil, errors.New("in_port field not found") } @@ -127,6 +135,11 @@ func (s *IGMPSnooper) validate(event *mcastGroupEvent, igmpType uint8, packetInD // Return true directly if there is no validator. return true, nil } + // MulticastValidator only validates the IGMP report message sent from Pods. The report message received from tunnel + // port is sent from Antrea Agent on a different Node, and returns true directly. + if event.iface.Type == interfacestore.TunnelInterface { + return true, nil + } if event.iface.Type != interfacestore.ContainerInterface { return true, fmt.Errorf("interface is not container") } @@ -201,6 +214,42 @@ func (s *IGMPSnooper) collectStats() (igmpANPStats, igmpACNPStats map[apitypes.U return igmpANPStats, igmpACNPStats } +func (s *IGMPSnooper) sendIGMPReport(groupRecordType uint8, groups []net.IP) error { + igmp, err := s.generateIGMPReportPacket(groupRecordType, groups) + if err != nil { + return err + } + if err := s.ofClient.SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, igmp); err != nil { + return err + } + klog.V(2).InfoS("Sent packetOut for IGMP v3 report", "groups", groups) + return nil +} + +func (s *IGMPSnooper) generateIGMPReportPacket(groupRecordType uint8, groups []net.IP) (util.Message, error) { + records := make([]protocol.IGMPv3GroupRecord, len(groups)) + for i, group := range groups { + records[i] = protocol.IGMPv3GroupRecord{ + Type: groupRecordType, + MulticastAddress: group, + } + } + return &protocol.IGMPv3MembershipReport{ + Type: protocol.IGMPv3Report, + Checksum: 0, + NumberOfGroups: uint16(len(records)), + GroupRecords: records, + }, nil +} + +func (s *IGMPSnooper) sendIGMPJoinReport(groups []net.IP) error { + return s.sendIGMPReport(protocol.IGMPIsEx, groups) +} + +func (s *IGMPSnooper) sendIGMPLeaveReport(groups []net.IP) error { + return s.sendIGMPReport(protocol.IGMPToIn, groups) +} + func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { now := time.Now() iface, err := s.parseSrcInterface(pktIn) @@ -209,8 +258,15 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { } klog.V(2).InfoS("Received PacketIn for IGMP packet", "in_port", iface.OFPort) podName := "unknown" + var srcNode net.IP if iface.Type == interfacestore.ContainerInterface { podName = iface.PodName + } else if iface.Type == interfacestore.TunnelInterface { + var err error + srcNode, err = s.parseSrcNode(pktIn) + if err != nil { + return err + } } igmp, err := parseIGMPPacket(pktIn.Data) if err != nil { @@ -240,10 +296,11 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { evtType = groupLeave } event := &mcastGroupEvent{ - group: mgroup, - eType: evtType, - time: now, - iface: iface, + group: mgroup, + eType: evtType, + time: now, + iface: iface, + srcNode: srcNode, } s.validatePacketAndNotify(event, igmpType, pktIn.Data) } @@ -261,6 +318,16 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error { return nil } +func (s *IGMPSnooper) parseSrcNode(pktIn *ofctrl.PacketIn) (net.IP, error) { + matches := pktIn.GetMatches() + tunSrcField := matches.GetMatchByName(openflowKeyTunnelSrc) + if tunSrcField == nil { + return nil, errors.New("in_port field not found") + } + tunSrc := tunSrcField.GetValue().(net.IP) + return tunSrc, nil +} + func generateIGMPQueryPacket(group net.IP, version uint8, queryInterval time.Duration) (util.Message, error) { // The max response time field in IGMP protocol uses a value in units of 1/10 second. // See https://datatracker.ietf.org/doc/html/rfc2236 and https://datatracker.ietf.org/doc/html/rfc3376 @@ -332,8 +399,8 @@ func parseIGMPPacket(pkt protocol.Ethernet) (protocol.IGMPMessage, error) { } } -func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent, queryInterval time.Duration, multicastValidator types.McastNetworkPolicyController) *IGMPSnooper { - snooper := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh, validator: multicastValidator, queryInterval: queryInterval} +func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent, queryInterval time.Duration, multicastValidator types.McastNetworkPolicyController, encapEnabled bool) *IGMPSnooper { + snooper := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh, validator: multicastValidator, queryInterval: queryInterval, encapEnabled: encapEnabled} snooper.igmpReportACNPStats = make(map[apitypes.UID]map[string]*types.RuleMetric) snooper.igmpReportANPStats = make(map[apitypes.UID]map[string]*types.RuleMetric) ofClient.RegisterPacketInHandler(uint8(openflow.PacketInReasonMC), "MulticastGroupDiscovery", snooper) diff --git a/pkg/agent/multicast/mcast_discovery_test.go b/pkg/agent/multicast/mcast_discovery_test.go new file mode 100644 index 00000000000..3af352277f3 --- /dev/null +++ b/pkg/agent/multicast/mcast_discovery_test.go @@ -0,0 +1,185 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicast + +import ( + "net" + "sync" + "testing" + + "antrea.io/libOpenflow/openflow13" + "antrea.io/libOpenflow/protocol" + "antrea.io/libOpenflow/util" + "antrea.io/ofnet/ofctrl" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" + + "antrea.io/antrea/pkg/agent/interfacestore" + ifaceStoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" + "antrea.io/antrea/pkg/ovs/ovsconfig" +) + +var ( + pktInSrcMAC, _ = net.ParseMAC("11:22:33:44:55:66") + pktInDstMAC, _ = net.ParseMAC("01:00:5e:00:00:16") +) + +type snooperValidator struct { + eventCh chan *mcastGroupEvent + groupJoinedNodes map[string]sets.String + groupLeftNodes map[string]sets.String +} + +func (v *snooperValidator) processPackets(expectedPackets int) { + appendSrcNode := func(groupKey string, groupNodes map[string]sets.String, nodeIP net.IP) map[string]sets.String { + _, exists := groupNodes[groupKey] + if !exists { + groupNodes[groupKey] = sets.NewString() + } + groupNodes[groupKey] = groupNodes[groupKey].Insert(nodeIP.String()) + return groupNodes + } + for i := 0; i < expectedPackets; i++ { + select { + case e := <-v.eventCh: + groupKey := e.group.String() + if e.eType == groupJoin { + v.groupJoinedNodes = appendSrcNode(groupKey, v.groupJoinedNodes, e.srcNode) + } else { + v.groupLeftNodes = appendSrcNode(groupKey, v.groupLeftNodes, e.srcNode) + } + } + } +} + +func TestIGMPRemoteReport(t *testing.T) { + controller := gomock.NewController(t) + mockOFClient := openflowtest.NewMockClient(controller) + mockIfaceStore := ifaceStoretest.NewMockInterfaceStore(controller) + eventCh := make(chan *mcastGroupEvent, 100) + snooper := &IGMPSnooper{ofClient: mockOFClient, eventCh: eventCh, ifaceStore: mockIfaceStore} + + localNodeIP := net.ParseIP("1.2.3.4") + tunnelPort := uint32(1) + wg := sync.WaitGroup{} + + generateRemotePackets := func(groups []net.IP, nodes []net.IP, igmpMsgType uint8) []ofctrl.PacketIn { + packets := make([]ofctrl.PacketIn, 0, len(nodes)) + for _, srcNode := range nodes { + pkt := generatePacketInForRemoteReport(t, snooper, groups, srcNode, igmpMsgType, tunnelPort) + packets = append(packets, pkt) + } + return packets + } + validateGroupNodes := func(groups []net.IP, expectedNodesIPs []net.IP, testGroupNodes map[string]sets.String) { + if len(expectedNodesIPs) == 0 { + return + } + for _, g := range groups { + expectedNodes := sets.NewString() + for _, n := range expectedNodesIPs { + expectedNodes.Insert(n.String()) + } + nodes, exists := testGroupNodes[g.String()] + assert.True(t, exists) + assert.True(t, nodes.HasAll(expectedNodes.List()...)) + } + } + testPacketProcess := func(groups []net.IP, joinedNodes []net.IP, leftNodes []net.IP) { + validator := snooperValidator{eventCh: eventCh, groupJoinedNodes: make(map[string]sets.String), groupLeftNodes: make(map[string]sets.String)} + packets := make([]ofctrl.PacketIn, 0, len(joinedNodes)+len(leftNodes)) + packets = append(packets, generateRemotePackets(groups, joinedNodes, protocol.IGMPIsEx)...) + packets = append(packets, generateRemotePackets(groups, leftNodes, protocol.IGMPToIn)...) + + eventCount := len(groups) * len(packets) + wg.Add(1) + go func() { + validator.processPackets(eventCount) + wg.Done() + }() + + mockIfaceStore.EXPECT().GetInterfaceByOFPort(tunnelPort).Return(createTunnelInterface(tunnelPort, localNodeIP), true).Times(len(packets)) + for i := range packets { + pkt := &packets[i] + err := snooper.processPacketIn(pkt) + assert.Nil(t, err, "Failed to process IGMP Report message") + } + + wg.Wait() + + validateGroupNodes(groups, joinedNodes, validator.groupJoinedNodes) + validateGroupNodes(groups, leftNodes, validator.groupLeftNodes) + } + + for _, tc := range []struct { + groupsStrings []string + joinedNodesStrings []string + leftNodesStrings []string + }{ + {groupsStrings: []string{"225.1.2.3", "225.1.2.4"}, joinedNodesStrings: []string{"1.2.3.5", "1.2.3.6"}, leftNodesStrings: []string{"1.2.3.6"}}, + {groupsStrings: []string{"225.1.2.5"}, joinedNodesStrings: []string{"1.2.3.5"}}, + {groupsStrings: []string{"225.1.2.6"}, leftNodesStrings: []string{"1.2.3.6"}}, + } { + var groups, joinedNodes, leftNodes []net.IP + for _, g := range tc.groupsStrings { + groups = append(groups, net.ParseIP(g)) + } + for _, n := range tc.joinedNodesStrings { + joinedNodes = append(joinedNodes, net.ParseIP(n)) + } + for _, n := range tc.leftNodesStrings { + leftNodes = append(leftNodes, net.ParseIP(n)) + } + testPacketProcess(groups, joinedNodes, leftNodes) + } +} + +func generatePacket(m util.Message, ofport uint32, srcNodeIP net.IP) ofctrl.PacketIn { + pkt := openflow13.NewPacketIn() + matchInport := openflow13.NewInPortField(ofport) + pkt.Match.AddField(*matchInport) + if srcNodeIP != nil { + matchTunSrc := openflow13.NewTunnelIpv4SrcField(srcNodeIP, nil) + pkt.Match.AddField(*matchTunSrc) + } + ipPacket := &protocol.IPv4{ + Version: 0x4, + IHL: 5, + Protocol: IGMPProtocolNumber, + Length: 20 + m.Len(), + Data: m, + } + pkt.Data = protocol.Ethernet{ + HWDst: pktInDstMAC, + HWSrc: pktInSrcMAC, + Ethertype: protocol.IPv4_MSG, + Data: ipPacket, + } + return ofctrl.PacketIn(*pkt) +} + +func generatePacketInForRemoteReport(t *testing.T, snooper *IGMPSnooper, groups []net.IP, srcNode net.IP, igmpMsgType uint8, tunnelPort uint32) ofctrl.PacketIn { + msg, err := snooper.generateIGMPReportPacket(igmpMsgType, groups) + assert.Nil(t, err, "Failed to generate IGMP Report message") + return generatePacket(msg, tunnelPort, srcNode) +} + +func createTunnelInterface(tunnelPort uint32, localNodeIP net.IP) *interfacestore.InterfaceConfig { + tunnelInterface := interfacestore.NewTunnelInterface("antrea-tun0", ovsconfig.GeneveTunnel, localNodeIP, false) + tunnelInterface.OVSPortConfig = &interfacestore.OVSPortConfig{OFPort: int32(tunnelPort)} + return tunnelInterface +} diff --git a/pkg/agent/multicast/mcast_route.go b/pkg/agent/multicast/mcast_route.go index 279c11c4f19..bf2b9ea1251 100644 --- a/pkg/agent/multicast/mcast_route.go +++ b/pkg/agent/multicast/mcast_route.go @@ -32,7 +32,7 @@ const ( MulticastRecvBufferSize = 128 ) -func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.String) *MRouteClient { +func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.String, encapEnabled bool) *MRouteClient { var m = &MRouteClient{ igmpMsgChan: make(chan []byte, workerCount), nodeConfig: nodeconfig, @@ -83,11 +83,11 @@ type MRouteClient struct { // by making these interfaces accept multicast traffic with multicast ip:mgroup. // https://tldp.org/HOWTO/Multicast-HOWTO-6.html#ss6.4 func (c *MRouteClient) multicastInterfacesJoinMgroup(mgroup net.IP) error { + groupIP := mgroup.To4() for _, config := range c.multicastInterfaceConfigs { addrIP := config.IPv4Addr.IP.To4() - groupIP := mgroup.To4() err := c.socket.MulticastInterfaceJoinMgroup(groupIP, addrIP, config.Name) - if err != nil { + if err != nil && !strings.Contains(err.Error(), "address already in use") { return err } } @@ -95,9 +95,9 @@ func (c *MRouteClient) multicastInterfacesJoinMgroup(mgroup net.IP) error { } func (c *MRouteClient) multicastInterfacesLeaveMgroup(mgroup net.IP) error { + groupIP := mgroup.To4() for _, config := range c.multicastInterfaceConfigs { addrIP := config.IPv4Addr.IP.To4() - groupIP := mgroup.To4() err := c.socket.MulticastInterfaceLeaveMgroup(groupIP, addrIP, config.Name) if err != nil { return err @@ -158,7 +158,7 @@ func (c *MRouteClient) deleteInboundMrouteEntryByGroup(group net.IP) (err error) } // addOutboundMrouteEntry configures multicast route from Antrea gateway to all the multicast interfaces, -// allowing multicast sender Pods to send multicast traffic to external. +// allowing multicast srcNode Pods to send multicast traffic to external. func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err error) { klog.V(2).InfoS("Adding outbound multicast route entry", "src", src, "group", group, "outboundVIFs", c.externalInterfaceVIFs) err = c.socket.AddMrouteEntry(src, group, c.internalInterfaceVIF, c.externalInterfaceVIFs) diff --git a/pkg/agent/multicast/mcast_route_test.go b/pkg/agent/multicast/mcast_route_test.go index b9f4c14d69c..4fff580436c 100644 --- a/pkg/agent/multicast/mcast_route_test.go +++ b/pkg/agent/multicast/mcast_route_test.go @@ -33,9 +33,10 @@ import ( ) var ( - addrIf1 = &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)} - addrIf2 = &net.IPNet{IP: externalInterfaceIP, Mask: net.IPv4Mask(255, 255, 255, 0)} - nodeConfig = &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addrIf1} + externalInterfaceIP = net.ParseIP("192.168.50.23") + addrIf1 = &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)} + addrIf2 = &net.IPNet{IP: externalInterfaceIP, Mask: net.IPv4Mask(255, 255, 255, 0)} + nodeConfig = &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addrIf1} ) func TestParseIGMPMsg(t *testing.T) { @@ -117,7 +118,7 @@ func newMockMulticastRouteClient(t *testing.T) *MRouteClient { groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) - return newRouteClient(nodeConfig, groupCache, mockMulticastSocket, sets.NewString(if1.InterfaceName)) + return newRouteClient(nodeConfig, groupCache, mockMulticastSocket, sets.NewString(if1.InterfaceName), false) } func (c *MRouteClient) initialize(t *testing.T) error { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index d4995f2117d..1f5fe3be0ab 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -19,6 +19,7 @@ import ( "math/rand" "net" + "antrea.io/libOpenflow/openflow13" "antrea.io/libOpenflow/protocol" ofutil "antrea.io/libOpenflow/util" v1 "k8s.io/api/core/v1" @@ -280,11 +281,19 @@ type Client interface { // InstallMulticastInitialFlows installs OpenFlow to packetIn the IGMP messages and output the Multicast traffic to // antrea-gw0 so that local Pods could access external Multicast servers. InstallMulticastInitialFlows(pktInReason uint8) error + // InstallMulticastFlows installs the flow to forward Multicast traffic normally, and output it to antrea-gw0 // to ensure it can be forwarded to the external addresses. InstallMulticastFlows(multicastIP net.IP, groupID binding.GroupIDType) error + // UninstallMulticastFlows removes the flow matching the given multicastIP. UninstallMulticastFlows(multicastIP net.IP) error + + // InstallMulticastRemoteReportFlows installs flows to forward the IGMP report messages to the other Nodes, + // and packetIn the report messages to Antrea Agent which is received via tunnel port. + // The OpenFlow group identified by groupID is used to forward packet to all other Nodes in the cluster + // over tunnel. + InstallMulticastRemoteReportFlows(groupID binding.GroupIDType) error // SendIGMPQueryPacketOut sends the IGMPQuery packet as a packet-out to OVS from the gateway port. SendIGMPQueryPacketOut( dstMAC net.HardwareAddr, @@ -304,7 +313,13 @@ type Client interface { // UninstallTrafficControlReturnPortFlow removes the flow to classify the packets from a return port. UninstallTrafficControlReturnPortFlow(returnOFPort uint32) error - InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32) error + InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error + + // SendIGMPRemoteReportPacketOut sends the IGMP report packet as a packet-out to remote Nodes via the tunnel port. + SendIGMPRemoteReportPacketOut( + dstMAC net.HardwareAddr, + dstIP net.IP, + igmp ofutil.Message) error // InstallMulticlusterNodeFlows installs flows to handle cross-cluster packets between a regular // Node and a local Gateway. @@ -800,7 +815,7 @@ func (c *client) generatePipelines() { if c.enableMulticast { // TODO: add support for IPv6 protocol - c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge, c.enableAntreaPolicy, c.nodeConfig.GatewayConfig.OFPort) + c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge, c.enableAntreaPolicy, c.nodeConfig.GatewayConfig.OFPort, c.networkConfig.TrafficEncapMode.SupportsEncap(), config.DefaultTunOFPort) c.activatedFeatures = append(c.activatedFeatures, c.featureMulticast) } @@ -1190,6 +1205,15 @@ func (c *client) UninstallMulticastFlows(multicastIP net.IP) error { return c.deleteFlows(c.featureMulticast.cachedFlows, cacheKey) } +func (c *client) InstallMulticastRemoteReportFlows(groupID binding.GroupIDType) error { + firstMulticastTable := c.pipelines[pipelineMulticast].GetFirstTable() + flows := c.featureMulticast.multicastRemoteReportFlows(groupID, firstMulticastTable) + cacheKey := "multicast_encap" + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.addFlows(c.featureMulticast.cachedFlows, cacheKey, flows) +} + func (c *client) SendIGMPQueryPacketOut( dstMAC net.HardwareAddr, dstIP net.IP, @@ -1240,7 +1264,25 @@ func (c *client) UninstallTrafficControlReturnPortFlow(returnOFPort uint32) erro return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey) } -func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32) error { +func (c *client) SendIGMPRemoteReportPacketOut( + dstMAC net.HardwareAddr, + dstIP net.IP, + igmp ofutil.Message) error { + srcMAC := c.nodeConfig.GatewayConfig.MAC.String() + srcIP := c.nodeConfig.NodeTransportIPv4Addr.IP.String() + dstMACStr := dstMAC.String() + dstIPStr := dstIP.String() + packetOutBuilder, err := setBasePacketOutBuilder(c.bridge.BuildPacketOut(), srcMAC, dstMACStr, srcIP, dstIPStr, openflow13.P_CONTROLLER, 0) + if err != nil { + return err + } + // Set protocol, L4 message, and target OF Group ID. + packetOutBuilder = packetOutBuilder.SetIPProtocol(binding.ProtocolIGMP).SetL4Packet(igmp) + packetOutObj := packetOutBuilder.Done() + return c.bridge.SendPacketOut(packetOutObj) +} + +func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() table := MulticastOutputTable @@ -1248,7 +1290,7 @@ func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceive table = MulticastIngressRuleTable } - if err := c.featureMulticast.multicastReceiversGroup(groupID, table.GetID(), localReceivers...); err != nil { + if err := c.featureMulticast.multicastReceiversGroup(groupID, table.GetID(), localReceivers, remoteNodeReceivers); err != nil { return err } return nil diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index 79df3424a88..5d3de4bd05f 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -19,8 +19,10 @@ import ( "net" "sync" + "antrea.io/libOpenflow/openflow13" "k8s.io/klog/v2" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow/cookie" "antrea.io/antrea/pkg/agent/types" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -31,6 +33,8 @@ type featureMulticast struct { ipProtocols []binding.Protocol bridge binding.Bridge gatewayPort uint32 + encapEnabled bool + tunnelPort uint32 cachedFlows *flowCategoryCache groupCache sync.Map @@ -43,7 +47,7 @@ func (f *featureMulticast) getFeatureName() string { return "Multicast" } -func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge, anpEnabled bool, gwPort uint32) *featureMulticast { +func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge, anpEnabled bool, gwPort uint32, encapEnabled bool, tunnelPort uint32) *featureMulticast { return &featureMulticast{ cookieAllocator: cookieAllocator, ipProtocols: ipProtocols, @@ -53,6 +57,8 @@ func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding groupCache: sync.Map{}, enableAntreaPolicy: anpEnabled, gatewayPort: gwPort, + encapEnabled: encapEnabled, + tunnelPort: tunnelPort, } } @@ -68,9 +74,7 @@ func multicastPipelineClassifyFlow(cookieID uint64, pipeline binding.Pipeline) b func (f *featureMulticast) initFlows() []binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() - return []binding.Flow{ - f.multicastOutputFlow(cookieID), - } + return f.multicastOutputFlows(cookieID) } func (f *featureMulticast) replayFlows() []binding.Flow { @@ -78,7 +82,7 @@ func (f *featureMulticast) replayFlows() []binding.Flow { return getCachedFlows(f.cachedFlows) } -func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports ...uint32) error { +func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports []uint32, remoteIPs []net.IP) error { group := f.bridge.CreateGroupTypeAll(groupID).ResetBuckets() for i := range ports { group = group.Bucket(). @@ -87,6 +91,14 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, ResubmitToTable(tableID). Done() } + for _, ip := range remoteIPs { + group = group.Bucket(). + LoadToRegField(OFPortFoundRegMark.GetField(), OFPortFoundRegMark.GetValue()). + LoadToRegField(TargetOFPortField, f.tunnelPort). + SetTunnelDst(ip). + ResubmitToTable(MulticastOutputTable.GetID()). + Done() + } if err := group.Add(); err != nil { return fmt.Errorf("error when installing Multicast receiver Group: %w", err) } @@ -94,12 +106,38 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, return nil } -func (f *featureMulticast) multicastOutputFlow(cookieID uint64) binding.Flow { - return MulticastOutputTable.ofTable.BuildFlow(priorityNormal). - Cookie(cookieID). - MatchRegMark(OFPortFoundRegMark). - Action().OutputToRegField(TargetOFPortField). - Done() +func (f *featureMulticast) multicastOutputFlows(cookieID uint64) []binding.Flow { + flows := []binding.Flow{ + MulticastOutputTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID). + MatchRegMark(OFPortFoundRegMark). + Action().OutputToRegField(TargetOFPortField). + Done(), + } + if f.encapEnabled { + // When running with encap mode, drop the multicast packets if it is received from tunnel port and expected to + // output to antrea-gw0, or received from antrea-gw0 and expected to output to tunnel. These flows are used to + // avoid duplication on packet forwarding. For example, if the packet is received on tunnel port, it means + // the sender is a Pod on other Node, then the packet is already sent to external via antrea-gw0 on the source + // Node. On the reverse, if the packet is received on antrea-gw0, it means the sender is from external, then + // the Pod receivers on other Nodes should also receive the packets from the underlay network. + flows = append(flows, MulticastOutputTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchRegMark(FromTunnelRegMark). + MatchRegMark(OFPortFoundRegMark). + MatchRegFieldWithValue(TargetOFPortField, config.HostGatewayOFPort). + Action().Drop(). + Done(), + MulticastOutputTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchRegMark(FromGatewayRegMark). + MatchRegMark(OFPortFoundRegMark). + MatchRegFieldWithValue(TargetOFPortField, config.DefaultTunOFPort). + Action().Drop(). + Done(), + ) + } + return flows } func (f *featureMulticast) multicastSkipIGMPMetricFlows() []binding.Flow { @@ -147,3 +185,34 @@ func (f *featureMulticast) replayGroups() { return true }) } + +func (f *featureMulticast) multicastRemoteReportFlows(groupID binding.GroupIDType, firstMulticastTable binding.Table) []binding.Flow { + return []binding.Flow{ + // This flow outputs the IGMP report message sent from Antrea Agent to an OpenFlow group which is expected to + // broadcast to all the other Nodes in the cluster. The multicast groups in side the IGMP report message + // include the ones local Pods have joined in. + MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(binding.ProtocolIGMP). + MatchInPort(openflow13.P_CONTROLLER). + Action().LoadRegMark(CustomReasonIGMPRegMark). + Action().Group(groupID). + Done(), + // This flow ensures the IGMP report message sent from Antrea Agent to bypass the check in SpoofGuardTable. + ClassifierTable.ofTable.BuildFlow(priorityNormal). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchInPort(openflow13.P_CONTROLLER). + Action().GotoTable(SpoofGuardTable.GetNext()). + Done(), + // This flow ensures the multicast packet sent from a different Node via the tunnel port to enter Multicast + // pipeline. + ClassifierTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchInPort(f.tunnelPort). + MatchProtocol(binding.ProtocolIP). + MatchDstIPNet(*types.McastCIDR). + Action().LoadRegMark(FromTunnelRegMark). + Action().GotoTable(firstMulticastTable.GetID()). + Done(), + } +} diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 3fdb03c5f69..b928ea13bef 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2666,20 +2666,24 @@ func pipelineClassifyFlow(cookieID uint64, protocol binding.Protocol, pipeline b // igmpPktInFlows generates the flow to load CustomReasonIGMPRegMark to mark the IGMP packet in MulticastRoutingTable // and sends it to antrea-agent. func (f *featureMulticast) igmpPktInFlows(reason uint8) []binding.Flow { - flows := []binding.Flow{ - // Set a custom reason for the IGMP packets, and then send it to antrea-agent and forward it normally in the - // OVS bridge, so that the OVS multicast db cache can be updated, and antrea-agent can identify the local multicast - // group and its members in the meanwhile. - // Do not set dst IP address because IGMPv1 report message uses target multicast group as IP destination in - // the packet. - MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). - Cookie(f.cookieAllocator.Request(f.category).Raw()). - MatchProtocol(binding.ProtocolIGMP). - MatchRegMark(FromLocalRegMark). - Action().LoadRegMark(CustomReasonIGMPRegMark). - Action().SendToController(reason). - Action().Normal(). - Done(), + var flows []binding.Flow + sourceMarks := []*binding.RegMark{FromLocalRegMark} + if f.encapEnabled { + sourceMarks = append(sourceMarks, FromTunnelRegMark) + } + for _, m := range sourceMarks { + flows = append(flows, + // Set a custom reason for the IGMP packets, and then send it to antrea-agent. Then antrea-agent can identify + // the local multicast group and its members in the meanwhile. + // Do not set dst IP address because IGMPv1 report message uses target multicast group as IP destination in + // the packet. + MulticastRoutingTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(binding.ProtocolIGMP). + MatchRegMark(m). + Action().LoadRegMark(CustomReasonIGMPRegMark). + Action().SendToController(reason). + Done()) } return flows } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index f59dc19007f..cfb9be05fd8 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -297,17 +297,17 @@ func (mr *MockClientMockRecorder) InstallMulticastFlows(arg0, arg1 interface{}) } // InstallMulticastGroup mocks base method -func (m *MockClient) InstallMulticastGroup(arg0 openflow.GroupIDType, arg1 []uint32) error { +func (m *MockClient) InstallMulticastGroup(arg0 openflow.GroupIDType, arg1 []uint32, arg2 []net.IP) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallMulticastGroup", arg0, arg1) + ret := m.ctrl.Call(m, "InstallMulticastGroup", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } // InstallMulticastGroup indicates an expected call of InstallMulticastGroup -func (mr *MockClientMockRecorder) InstallMulticastGroup(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallMulticastGroup(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastGroup", reflect.TypeOf((*MockClient)(nil).InstallMulticastGroup), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastGroup", reflect.TypeOf((*MockClient)(nil).InstallMulticastGroup), arg0, arg1, arg2) } // InstallMulticastInitialFlows mocks base method @@ -324,6 +324,20 @@ func (mr *MockClientMockRecorder) InstallMulticastInitialFlows(arg0 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastInitialFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastInitialFlows), arg0) } +// InstallMulticastRemoteReportFlows mocks base method +func (m *MockClient) InstallMulticastRemoteReportFlows(arg0 openflow.GroupIDType) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticastRemoteReportFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticastRemoteReportFlows indicates an expected call of InstallMulticastRemoteReportFlows +func (mr *MockClientMockRecorder) InstallMulticastRemoteReportFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastRemoteReportFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastRemoteReportFlows), arg0) +} + // InstallMulticlusterClassifierFlows mocks base method func (m *MockClient) InstallMulticlusterClassifierFlows(arg0 uint32, arg1 bool) error { m.ctrl.T.Helper() @@ -670,6 +684,20 @@ func (mr *MockClientMockRecorder) SendIGMPQueryPacketOut(arg0, arg1, arg2, arg3 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendIGMPQueryPacketOut", reflect.TypeOf((*MockClient)(nil).SendIGMPQueryPacketOut), arg0, arg1, arg2, arg3) } +// SendIGMPRemoteReportPacketOut mocks base method +func (m *MockClient) SendIGMPRemoteReportPacketOut(arg0 net.HardwareAddr, arg1 net.IP, arg2 util.Message) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendIGMPRemoteReportPacketOut", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendIGMPRemoteReportPacketOut indicates an expected call of SendIGMPRemoteReportPacketOut +func (mr *MockClientMockRecorder) SendIGMPRemoteReportPacketOut(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendIGMPRemoteReportPacketOut", reflect.TypeOf((*MockClient)(nil).SendIGMPRemoteReportPacketOut), arg0, arg1, arg2) +} + // SendTCPPacketOut mocks base method func (m *MockClient) SendTCPPacketOut(arg0, arg1, arg2, arg3 string, arg4, arg5 uint32, arg6 bool, arg7, arg8 uint16, arg9 uint32, arg10 byte, arg11 func(openflow.PacketOutBuilder) openflow.PacketOutBuilder) error { m.ctrl.T.Helper() diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 1514e226229..8f57d0d8040 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -57,6 +57,10 @@ const ( localAntreaFlexibleIPAMPodIPSet = "LOCAL-FLEXIBLE-IPAM-POD-IP" // localAntreaFlexibleIPAMPodIP6Set contains all AntreaFlexibleIPAM Pod IPv6s of this Node. localAntreaFlexibleIPAMPodIP6Set = "LOCAL-FLEXIBLE-IPAM-POD-IP6" + // clusterNodeIPSet contains all other Node IPs in the cluster. + clusterNodeIPSet = "CLUSTER-NODE-IP" + // clusterNodeIP6Set contains all other Node IP6s in the cluster. + clusterNodeIP6Set = "CLUSTER-NODE-IP6" // Antrea proxy NodePort IP antreaNodePortIPSet = "ANTREA-NODEPORT-IP" @@ -114,6 +118,10 @@ type Client struct { clusterIPv4CIDR *net.IPNet // clusterIPv6CIDR stores the calculated ClusterIP CIDR for IPv6. clusterIPv6CIDR *net.IPNet + // clusterNodeIPs stores the IPv4 of all other Nodes in the cluster + clusterNodeIPs sync.Map + // clusterNodeIP6s stores the IPv6 of all other Nodes in the cluster + clusterNodeIP6s sync.Map } // NewClient returns a route client. @@ -339,6 +347,29 @@ func (c *Client) syncIPSet() error { } } + if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() { + if err := ipset.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false); err != nil { + return err + } + if err := ipset.CreateIPSet(clusterNodeIP6Set, ipset.HashIP, true); err != nil { + return err + } + c.clusterNodeIPs.Range(func(_, v interface{}) bool { + ipsetEntry := v.(string) + if err := ipset.AddEntry(clusterNodeIPSet, ipsetEntry); err != nil { + return false + } + return true + }) + c.clusterNodeIP6s.Range(func(_, v interface{}) bool { + ipSetEntry := v.(string) + if err := ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + return false + } + return true + }) + } + return nil } @@ -486,6 +517,7 @@ func (c *Client) syncIPTables() error { antreaPodIPSet, localAntreaFlexibleIPAMPodIPSet, antreaNodePortIPSet, + clusterNodeIPSet, config.VirtualNodePortDNATIPv4, config.VirtualServiceIPv4, snatMarkToIPv4, @@ -503,6 +535,7 @@ func (c *Client) syncIPTables() error { antreaPodIP6Set, localAntreaFlexibleIPAMPodIP6Set, antreaNodePortIP6Set, + clusterNodeIP6Set, config.VirtualNodePortDNATIPv6, config.VirtualServiceIPv6, snatMarkToIPv6, @@ -519,7 +552,8 @@ func (c *Client) syncIPTables() error { func (c *Client) restoreIptablesData(podCIDR *net.IPNet, podIPSet, localAntreaFlexibleIPAMPodIPSet, - nodePortIPSet string, + nodePortIPSet, + clusterNodeIPSet string, nodePortDNATVirtualIP, serviceVirtualIP net.IP, snatMarkToIP map[uint32]net.IP, @@ -559,6 +593,19 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, "-j", iptables.NoTrackTarget, }...) } + + if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() { + // Drop the multicast packets forwarded from other Nodes in the cluster. This is because + // the packet sent out from the sender Pod is already received via tunnel port with encap mode, + // and the one forwarded via the underlay network is to send to external receivers + writeLine(iptablesData, []string{ + "-A", antreaPreRoutingChain, + "-m", "comment", "--comment", `"Antrea: drop Pod multicast traffic forwarded via underlay network"`, + "-m", "set", "--match-set", clusterNodeIPSet, "src", + "-d", types.McastCIDR.String(), + "-j", iptables.DROPTarget, + }...) + } } writeLine(iptablesData, "COMMIT") @@ -1025,6 +1072,10 @@ func (c *Client) AddRoutes(podCIDR *net.IPNet, nodeName string, nodeIP, nodeGwIP c.nodeNeighbors.Store(podCIDRStr, neigh) } + if err := c.addNodeIP(podCIDR, nodeIP); err != nil { + return err + } + c.nodeRoutes.Store(podCIDRStr, routes) return nil } @@ -1048,6 +1099,9 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error { return err } } + if err := c.deleteNodeIP(podCIDR); err != nil { + return err + } } if podCIDR.IP.To4() == nil { neigh, exists := c.nodeNeighbors.Load(podCIDRStr) @@ -1478,6 +1532,62 @@ func (c *Client) DeleteLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) err return nil } +// addNodeIP adds nodeIP into the ipset when a new Node joins the cluster. +// The ipset is consumed with encap mode when multicast is enabled. +func (c *Client) addNodeIP(podCIDR *net.IPNet, nodeIP net.IP) error { + if !c.multicastEnabled || !c.networkConfig.TrafficEncapMode.SupportsEncap() { + return nil + } + if nodeIP == nil { + return nil + } + ipSetEntry := nodeIP.String() + if nodeIP.To4() != nil { + if err := ipset.AddEntry(clusterNodeIPSet, ipSetEntry); err != nil { + return err + } + c.clusterNodeIPs.Store(podCIDR.String(), ipSetEntry) + } else { + if err := ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + return err + } + c.clusterNodeIP6s.Store(podCIDR.String(), ipSetEntry) + } + return nil +} + +// deleteNodeIP deletes NodeIPs from the ipset when a Node leaves the cluster. +// The ipset is consumed with encap mode when multicast is enabled. +func (c *Client) deleteNodeIP(podCIDR *net.IPNet) error { + if !c.multicastEnabled || !c.networkConfig.TrafficEncapMode.SupportsEncap() { + return nil + } + + podCIDRStr := podCIDR.String() + if podCIDR.IP.To4() != nil { + obj, exists := c.clusterNodeIPs.Load(podCIDRStr) + if !exists { + return nil + } + ipSetEntry := obj.(string) + if err := ipset.DelEntry(clusterNodeIPSet, ipSetEntry); err != nil { + return err + } + c.clusterNodeIPs.Delete(podCIDRStr) + } else { + obj, exists := c.clusterNodeIP6s.Load(podCIDRStr) + if !exists { + return nil + } + ipSetEntry := obj.(string) + if err := ipset.DelEntry(clusterNodeIP6Set, ipSetEntry); err != nil { + return err + } + c.clusterNodeIP6s.Delete(podCIDRStr) + } + return nil +} + func getTransProtocolStr(protocol binding.Protocol) string { if protocol == binding.ProtocolTCP || protocol == binding.ProtocolTCPv6 { return "tcp" diff --git a/pkg/agent/types/multicast.go b/pkg/agent/types/multicast.go index da06447af3b..ae910dea965 100644 --- a/pkg/agent/types/multicast.go +++ b/pkg/agent/types/multicast.go @@ -32,6 +32,7 @@ type IGMPNPRuleInfo struct { var ( McastAllHosts = net.ParseIP("224.0.0.1").To4() + IGMPv3Router = net.ParseIP("224.0.0.22").To4() _, McastCIDR, _ = net.ParseCIDR("224.0.0.0/4") ) diff --git a/pkg/agent/util/iptables/iptables.go b/pkg/agent/util/iptables/iptables.go index 113bc028927..780c8b5fa4c 100644 --- a/pkg/agent/util/iptables/iptables.go +++ b/pkg/agent/util/iptables/iptables.go @@ -36,6 +36,7 @@ const ( RawTable = "raw" AcceptTarget = "ACCEPT" + DROPTarget = "DROP" MasqueradeTarget = "MASQUERADE" MarkTarget = "MARK" ReturnTarget = "RETURN" diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 74e362e69b0..e55ceb2df47 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -342,6 +342,7 @@ type BucketBuilder interface { LoadRegRange(regID int, data uint32, rng *Range) BucketBuilder LoadToRegField(field *RegField, data uint32) BucketBuilder ResubmitToTable(tableID uint8) BucketBuilder + SetTunnelDst(addr net.IP) BucketBuilder Done() Group } diff --git a/pkg/ovs/openflow/ofctrl_group.go b/pkg/ovs/openflow/ofctrl_group.go index ee37a35adc5..b16495b0c4a 100644 --- a/pkg/ovs/openflow/ofctrl_group.go +++ b/pkg/ovs/openflow/ofctrl_group.go @@ -16,6 +16,7 @@ package openflow import ( "fmt" + "net" "antrea.io/libOpenflow/openflow13" "antrea.io/ofnet/ofctrl" @@ -128,6 +129,13 @@ func (b *bucketBuilder) ResubmitToTable(tableID uint8) BucketBuilder { return b } +// SetTunnelDst is an action to set tunnel destination address when the bucket is selected. +func (b *bucketBuilder) SetTunnelDst(addr net.IP) BucketBuilder { + setTunDstAct := &ofctrl.SetTunnelDstAction{IP: addr} + b.bucket.AddAction(setTunDstAct.GetActionMessage()) + return b +} + // Weight sets the weight of a bucket. func (b *bucketBuilder) Weight(val uint16) BucketBuilder { b.bucket.Weight = val diff --git a/pkg/util/k8s/node.go b/pkg/util/k8s/node.go index dd288e98fc2..5318406da56 100644 --- a/pkg/util/k8s/node.go +++ b/pkg/util/k8s/node.go @@ -145,3 +145,19 @@ func GetNodeAllAddrs(node *v1.Node) (ips sets.String, err error) { return } + +func GetNodeTransportAddrs(node *v1.Node) (*ip.DualStackIPs, error) { + transportAddrs, err := GetNodeAddrsFromAnnotations(node, types.NodeTransportAddressAnnotationKey) + if err != nil { + return nil, err + } + if transportAddrs != nil { + return transportAddrs, nil + } + // Use NodeIP if the transport IP address is not set or not found. + nodeAddrs, err := GetNodeAddrs(node) + if err != nil { + return nil, err + } + return nodeAddrs, nil +} diff --git a/test/e2e/multicast_test.go b/test/e2e/multicast_test.go index 136e9e43d6f..12e20284b09 100644 --- a/test/e2e/multicast_test.go +++ b/test/e2e/multicast_test.go @@ -30,8 +30,11 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/multicast" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + agentconfig "antrea.io/antrea/pkg/config/agent" + controllerconfig "antrea.io/antrea/pkg/config/controller" "antrea.io/antrea/pkg/features" ) @@ -56,6 +59,27 @@ func TestMulticast(t *testing.T) { if err != nil { t.Fatalf("Error computing multicast interfaces: %v", err) } + t.Run("testMulticastWithNoEncap", func(t *testing.T) { + skipIfEncapModeIsNot(t, data, config.TrafficEncapModeNoEncap) + runMulticastTestCases(t, data, nodeMulticastInterfaces, true) + }) + t.Run("testMulticastWithEncap", func(t *testing.T) { + ac := func(config *agentconfig.AgentConfig) { + config.TrafficEncapMode = "encap" + config.FeatureGates["Multicast"] = true + config.Multicast.IGMPQueryInterval = "10s" + } + cc := func(config *controllerconfig.ControllerConfig) { + config.FeatureGates["Multicast"] = true + } + if err := data.mutateAntreaConfigMap(cc, ac, true, true); err != nil { + t.Fatalf("Failed to deploy cluster with encap mode: %v", err) + } + runMulticastTestCases(t, data, nodeMulticastInterfaces, false) + }) +} + +func runMulticastTestCases(t *testing.T, data *TestData, nodeMulticastInterfaces map[int][]string, checkReceiverRoute bool) { t.Run("testMulticastBetweenPodsInTwoNodes", func(t *testing.T) { skipIfNumNodesLessThan(t, 2) testcases := []multicastTestcase{ @@ -92,7 +116,7 @@ func TestMulticast(t *testing.T) { mc := mc t.Run(mc.name, func(t *testing.T) { t.Parallel() - runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, checkReceiverRoute) }) } }) @@ -132,11 +156,12 @@ func TestMulticast(t *testing.T) { mc := mc t.Run(mc.name, func(t *testing.T) { t.Parallel() - runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, checkReceiverRoute) }) } }) t.Run("testMulticastForwardToMultipleInterfaces", func(t *testing.T) { + skipIfEncapModeIsNot(t, data, config.TrafficEncapModeNoEncap) multipleInterfacesFound := false var nodeIdx int for i, ifaces := range nodeMulticastInterfaces { @@ -556,7 +581,7 @@ func testMulticastForwardToMultipleInterfaces(t *testing.T, data *TestData, send } } -func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string) { +func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string, checkReceiverRoute bool) { mcjoinWaitTimeout := defaultTimeout / time.Second gatewayInterface, err := data.GetGatewayInterfaceName(antreaNamespace) failOnError(err, t) @@ -602,19 +627,21 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc continue } for _, receiverMulticastInterface := range nodeMulticastInterfaces[receiver.nodeIdx] { - _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) - if err != nil { - return false, err - } - // If multicast traffic is sent from non-HostNetwork pods and senders-receivers are located in different nodes, - // the receivers should configure corresponding inbound multicast routes. - if mc.senderConfig.nodeIdx != receiver.nodeIdx && !receiver.isHostNetwork { - if len(mRouteResult) == 0 { - return false, nil + if checkReceiverRoute { + _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) + if err != nil { + return false, err } - } else { - if len(mRouteResult) != 0 { - return false, nil + // If multicast traffic is sent from non-HostNetwork pods and senders-receivers are located in different nodes, + // the receivers should configure corresponding inbound multicast routes. + if mc.senderConfig.nodeIdx != receiver.nodeIdx && !receiver.isHostNetwork { + if len(mRouteResult) == 0 { + return false, nil + } + } else { + if len(mRouteResult) != 0 { + return false, nil + } } } _, mAddrResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip maddr show %s | grep %s", receiverMulticastInterface, mc.group.String()))