diff --git a/ci/jenkins/external-hosts-config.yml b/ci/jenkins/external-hosts-config.yml new file mode 100644 index 00000000000..051db8bb626 --- /dev/null +++ b/ci/jenkins/external-hosts-config.yml @@ -0,0 +1,8 @@ +externalHosts: + - sshIP: 10.1.10.10 + name: antrea-multicast-external + interfaces: + - name: ens224 + ipv4: 10.1.10.10 + clusterInterfaces: + antrea-multicast-0-0: [ens224] diff --git a/ci/jenkins/test.sh b/ci/jenkins/test.sh index a92e79f643a..29f4cf986d8 100755 --- a/ci/jenkins/test.sh +++ b/ci/jenkins/test.sh @@ -53,13 +53,14 @@ _usage="Usage: $0 [--kubeconfig ] [--workdir ] Run K8s e2e community tests (Conformance & Network Policy) or Antrea e2e tests on a remote (Jenkins) Windows or Linux cluster. - --kubeconfig Path of cluster kubeconfig. - --workdir Home path for Go, vSphere information and antrea_logs during cluster setup. Default is $WORKDIR. - --testcase Windows install OVS, Conformance and Network Policy or Antrea e2e testcases on a Windows or Linux cluster. It can also be flexible ipam or multicast e2e test. - --registry The docker registry to use instead of dockerhub. - --proxyall Enable proxyAll to test AntreaProxy. - --testbed-type The testbed type to run tests. It can be flexible-ipam, jumper or legacy. - --ip-mode IP mode for flexible-ipam e2e test. Default is $DEFAULT_IP_MODE. It can also be ipv6 or ds." + --kubeconfig Path of cluster kubeconfig. + --workdir Home path for Go, vSphere information and antrea_logs during cluster setup. Default is $WORKDIR. + --testcase Windows install OVS, Conformance and Network Policy or Antrea e2e testcases on a Windows or Linux cluster. It can also be flexible ipam or multicast e2e test. + --registry The docker registry to use instead of dockerhub. + --proxyall Enable proxyAll to test AntreaProxy. + --testbed-type The testbed type to run tests. It can be flexible-ipam, jumper or legacy. + --ip-mode IP mode for flexible-ipam e2e test. Default is $DEFAULT_IP_MODE. It can also be ipv6 or ds. + --external-hosts-config-path The path of external host configuration file." function print_usage { echoerr "$_usage" @@ -78,6 +79,10 @@ case $key in KUBECONFIG_PATH="$2" shift 2 ;; + --external-hosts-config-path) + EXTERNAL_HOSTS_CONFIG_PATH="$2" + shift 2 + ;; --workdir) WORKDIR="$2" shift 2 @@ -120,6 +125,9 @@ if [[ "${IP_MODE}" != "${DEFAULT_IP_MODE}" && "${IP_MODE}" != "ipv6" && "${IP_MO echoerr "--ip-mode must be ipv4, ipv6 or ds" exit 1 fi + +EXTERNAL_HOSTS_CONFIG_PATH="ci/jenkins/external-hosts-config.yml" + if [[ "$WORKDIR" != "$DEFAULT_WORKDIR" && "$KUBECONFIG_PATH" == "$DEFAULT_KUBECONFIG_PATH" ]]; then KUBECONFIG_PATH=${WORKDIR}/.kube/config fi @@ -516,6 +524,21 @@ function deliver_antrea { fi } +function add_sshconfig_entry { + sshconfig_nodeip="$1" + sshconfig_nodename="$2" + cp ci/jenkins/ssh-config "${SSH_CONFIG_DST}.new" + sed -i "s/SSHCONFIGNODEIP/${sshconfig_nodeip}/g" "${SSH_CONFIG_DST}.new" + sed -i "s/SSHCONFIGNODENAME/${sshconfig_nodename}/g" "${SSH_CONFIG_DST}.new" + if [[ "${sshconfig_nodename}" =~ "win" ]]; then + sed -i "s/capv/administrator/g" "${SSH_CONFIG_DST}.new" + else + sed -i "s/capv/jenkins/g" "${SSH_CONFIG_DST}.new" + fi + echo " IdentityFile ${WORKDIR}/.ssh/id_rsa" >> "${SSH_CONFIG_DST}.new" + cat "${SSH_CONFIG_DST}.new" >> "${SSH_CONFIG_DST}" +} + function generate_ssh_config { echo "=== Generate ssh-config ===" SSH_CONFIG_DST="${WORKDIR}/.ssh/config" @@ -527,17 +550,14 @@ function generate_ssh_config { if [[ ! "${sshconfig_nodeip}" =~ ^[0-9]+(\.[0-9]+){3}$ ]];then sshconfig_nodeip="[${sshconfig_nodeip}]" fi - cp ci/jenkins/ssh-config "${SSH_CONFIG_DST}.new" - sed -i "s/SSHCONFIGNODEIP/${sshconfig_nodeip}/g" "${SSH_CONFIG_DST}.new" - sed -i "s/SSHCONFIGNODENAME/${sshconfig_nodename}/g" "${SSH_CONFIG_DST}.new" - if [[ "${sshconfig_nodename}" =~ "win" ]]; then - sed -i "s/capv/administrator/g" "${SSH_CONFIG_DST}.new" - else - sed -i "s/capv/jenkins/g" "${SSH_CONFIG_DST}.new" - fi - echo " IdentityFile ${WORKDIR}/.ssh/id_rsa" >> "${SSH_CONFIG_DST}.new" - cat "${SSH_CONFIG_DST}.new" >> "${SSH_CONFIG_DST}" + add_sshconfig_entry "$sshconfig_nodeip" "${sshconfig_nodename}" done + if [[ -n ${EXTERNAL_HOSTS_CONFIG_PATH} ]]; then + yq -r '.externalHosts.[] | {.name:.sshIP}' "$EXTERNAL_HOSTS_CONFIG_PATH" | while IFS=' :' read -r ssh_hostname ssh_ip; do + echo "adding ssh config for external host with hostname:$ssh_hostname and IP:$ssh_ip" + add_sshconfig_entry "$ssh_ip" "$ssh_hostname" + done + fi } function run_e2e { @@ -552,15 +572,17 @@ function run_e2e { mkdir -p "${WORKDIR}/.ssh" cp -f "${WORKDIR}/kube.conf" "${WORKDIR}/.kube/config" generate_ssh_config - + if [[ -n ${EXTERNAL_HOSTS_CONFIG_PATH} ]]; then + EXTERNAL_HOSTS_CONFIG_PATH="../../${EXTERNAL_HOSTS_CONFIG_PATH}" + fi set +e mkdir -p `pwd`/antrea-test-logs # HACK: see https://github.com/antrea-io/antrea/issues/2292 go mod edit -replace github.com/moby/spdystream=github.com/antoninbas/spdystream@v0.2.1 && go mod tidy if [[ $TESTBED_TYPE == "flexible-ipam" ]]; then - go test -v antrea.io/antrea/test/e2e --logs-export-dir `pwd`/antrea-test-logs --provider remote -timeout=100m --prometheus --antrea-ipam + go test -v antrea.io/antrea/test/e2e --logs-export-dir `pwd`/antrea-test-logs --external-hosts-config-path "$EXTERNAL_HOSTS_CONFIG_PATH" --provider remote -timeout=100m --prometheus --antrea-ipam else - go test -v antrea.io/antrea/test/e2e --logs-export-dir `pwd`/antrea-test-logs --provider remote -timeout=100m --prometheus + go test -run=TestMulticast -v antrea.io/antrea/test/e2e --logs-export-dir `pwd`/antrea-test-logs --external-hosts-config-path "$EXTERNAL_HOSTS_CONFIG_PATH" --provider remote -timeout=20m --prometheus fi if [[ "$?" != "0" ]]; then TEST_FAILURE=true @@ -614,10 +636,12 @@ function run_e2e_windows { mkdir -p "${WORKDIR}/.ssh" cp -f "${WORKDIR}/kube.conf" "${WORKDIR}/.kube/config" generate_ssh_config - + if [[ -n ${EXTERNAL_HOSTS_CONFIG_PATH} ]]; then + EXTERNAL_HOSTS_CONFIG_PATH="../../${EXTERNAL_HOSTS_CONFIG_PATH}" + fi set +e mkdir -p `pwd`/antrea-test-logs - go test -v antrea.io/antrea/test/e2e --logs-export-dir `pwd`/antrea-test-logs --provider remote -timeout=50m --prometheus + go test -v antrea.io/antrea/test/e2e --logs-export-dir `pwd`/antrea-test-logs --external-hosts-config-path "$EXTERNAL_HOSTS_CONFIG_PATH" --provider remote -timeout=50m --prometheus if [[ "$?" != "0" ]]; then TEST_FAILURE=true fi diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index 98f45b124c5..777fee43897 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -2987,12 +2987,11 @@ func testACNPIGMPQuery(t *testing.T, data *TestData, acnpName, caseName, groupAd testNamespace := data.testNamespace mc := multicastTestcase{ name: caseName, - senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: false}, - receiverConfigs: []multicastTestPodConfig{{1, false}}, + receiverIndices: []int{1}, port: 3457, group: net.ParseIP(groupAddress), } - senderName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-sender-", nodeName(mc.senderConfig.nodeIdx), testNamespace, mc.senderConfig.isHostNetwork) + senderName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-sender-", nodeName(0), testNamespace, false) defer cleanupFunc() var wg sync.WaitGroup receiverNames, cleanupFuncs := setupReceivers(t, data, mc, mcjoinWaitTimeout, &wg) @@ -3006,11 +3005,11 @@ func testACNPIGMPQuery(t *testing.T, data *TestData, acnpName, caseName, groupAd data.RunCommandFromPod(testNamespace, senderName, mcjoinContainerName, sendMulticastCommand) }() - tcpdumpName, _, cleanupFunc := createAndWaitForPod(t, data, data.createNetshootPodOnNode, "test-tcpdump-", nodeName(mc.receiverConfigs[0].nodeIdx), testNamespace, true) + tcpdumpName, _, cleanupFunc := createAndWaitForPod(t, data, data.createNetshootPodOnNode, "test-tcpdump-", nodeName(mc.receiverIndices[0]), testNamespace, true) defer cleanupFunc() queryGroupAddress := "224.0.0.1" - cmd, err := generatePacketCaptureCmd(t, data, 15, queryGroupAddress, nodeName(mc.receiverConfigs[0].nodeIdx), receiverNames[0]) + cmd, err := generatePacketCaptureCmd(t, data, 15, queryGroupAddress, nodeName(mc.receiverIndices[0]), receiverNames[0]) if err != nil { t.Fatalf("failed to call generateConnCheckCmd: %v", err) } @@ -3073,12 +3072,11 @@ func testACNPMulticastEgress(t *testing.T, data *TestData, acnpName, caseName, g testNamespace := data.testNamespace mc := multicastTestcase{ name: caseName, - senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: false}, - receiverConfigs: []multicastTestPodConfig{{1, false}}, + receiverIndices: []int{1}, port: 3457, group: net.ParseIP(groupAddress), } - senderName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-sender-", nodeName(mc.senderConfig.nodeIdx), testNamespace, mc.senderConfig.isHostNetwork) + senderName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-sender-", nodeName(0), testNamespace, false) defer cleanupFunc() var wg sync.WaitGroup receiverNames, cleanupFuncs := setupReceivers(t, data, mc, mcjoinWaitTimeout, &wg) @@ -3093,9 +3091,9 @@ func testACNPMulticastEgress(t *testing.T, data *TestData, acnpName, caseName, g data.RunCommandFromPod(testNamespace, senderName, mcjoinContainerName, sendMulticastCommand) }() // check if receiver can receive multicast packet - tcpdumpName, _, cleanupFunc := createAndWaitForPod(t, data, data.createNetshootPodOnNode, "test-tcpdump-", nodeName(mc.receiverConfigs[0].nodeIdx), testNamespace, true) + tcpdumpName, _, cleanupFunc := createAndWaitForPod(t, data, data.createNetshootPodOnNode, "test-tcpdump-", nodeName(mc.receiverIndices[0]), testNamespace, true) defer cleanupFunc() - cmd, err := generatePacketCaptureCmd(t, data, 5, mc.group.String(), nodeName(mc.receiverConfigs[0].nodeIdx), receiverNames[0]) + cmd, err := generatePacketCaptureCmd(t, data, 5, mc.group.String(), nodeName(mc.receiverIndices[0]), receiverNames[0]) if err != nil { t.Fatalf("failed to call generateConnCheckCmd: %v", err) } diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index 4c5b3e62b8e..d7ab58754a4 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -93,6 +93,12 @@ func skipIfNotIPv4Cluster(tb testing.TB) { } } +func skipIfNoExternalHosts(tb testing.TB) { + if len(externalHostInfo.hosts) == 0 { + tb.Skipf("Skipping test as it requires external hosts info but the external hosts info is not set") + } +} + func skipIfIPv6Cluster(tb testing.TB) { if clusterInfo.podV6NetworkCIDR != "" { tb.Skipf("Skipping test as it is not supported in IPv6 cluster") diff --git a/test/e2e/framework.go b/test/e2e/framework.go index ea5c97218c1..37acff12111 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "fmt" + "io/ioutil" "math/rand" "net" "os" @@ -154,6 +155,15 @@ type ClusterNode struct { os string } +type ExternalHost struct { + Name string `yaml:"name"` + Interfaces []ExternalHostInterface `yaml:"interfaces,flow"` +} + +type ExternalHostInterface struct { + Name string `name` +} + func (n ClusterNode) ip() string { if n.ipv4Addr != "" { return n.ipv4Addr @@ -180,17 +190,28 @@ type ClusterInfo struct { var clusterInfo ClusterInfo +type ExternalHostsConfig struct { + ExternalHosts []ExternalHost `yaml:"externalHosts,flow"` +} + +var externalHostInfo ExternalHostInfo + +type ExternalHostInfo struct { + hosts map[int]ExternalHost +} + type TestOptions struct { - providerName string - providerConfigPath string - logsExportDir string - logsExportOnSuccess bool - withBench bool - enableCoverage bool - enableAntreaIPAM bool - flowVisibility bool - coverageDir string - skipCases string + providerName string + providerConfigPath string + logsExportDir string + logsExportOnSuccess bool + withBench bool + enableCoverage bool + enableAntreaIPAM bool + flowVisibility bool + coverageDir string + skipCases string + externalHostsConfigPath string } var testOptions TestOptions @@ -359,6 +380,14 @@ func nodeName(idx int) string { return node.name } +func externalHostName(idx int) string { + host, ok := externalHostInfo.hosts[idx] + if !ok { + return "" + } + return host.Name +} + // nodeIP returns an empty string if there is no Node with the provided idx. If idx is 0, the IP // of the control-plane Node will be returned. func nodeIP(idx int) string { @@ -425,6 +454,29 @@ func (data *TestData) RunCommandOnNodeExt(nodeName, cmd string, envs map[string] return data.provider.RunCommandOnNodeExt(nodeName, cmd, envs, stdin, sudo) } +func (data *TestData) collectExternalHostsInfo(path string) error { + externalHostInfo = ExternalHostInfo{} + externalHostInfo.hosts = make(map[int]ExternalHost) + filePath, _ := filepath.Abs(path) + yamlFile, err := ioutil.ReadFile(filePath) + + if err != nil { + panic(err) + } + + var config ExternalHostsConfig + + err = yaml.Unmarshal(yamlFile, &config) + if err != nil { + panic(err) + } + for i, v := range config.ExternalHosts { + externalHostInfo.hosts[i] = v + fmt.Printf("The information of external host index %d is: %+v\n", i, externalHostInfo.hosts[i]) + } + return nil +} + func (data *TestData) collectClusterInfo() error { // retrieve K8s server version // this needs to be done first, as there may be dependencies on the diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go index cad7fc67f0b..d05f8360990 100644 --- a/test/e2e/main_test.go +++ b/test/e2e/main_test.go @@ -75,6 +75,7 @@ func (tOptions *TestOptions) setupCoverage() func() { // testMain is meant to be called by TestMain and enables the use of defer statements. func testMain(m *testing.M) int { flag.StringVar(&testOptions.providerName, "provider", "vagrant", "K8s test cluster provider") + flag.StringVar(&testOptions.externalHostsConfigPath, "external-hosts-config-path", "", "Path of external hosts config file") flag.StringVar(&testOptions.providerConfigPath, "provider-cfg-path", "", "Optional config file for provider") flag.StringVar(&testOptions.logsExportDir, "logs-export-dir", "", "Export directory for test logs") flag.BoolVar(&testOptions.logsExportOnSuccess, "logs-export-on-success", false, "Export logs even when a test is successful") @@ -105,6 +106,12 @@ func testMain(m *testing.M) int { if err := testData.collectClusterInfo(); err != nil { log.Fatalf("Error when collecting information about K8s cluster: %v", err) } + if testOptions.externalHostsConfigPath != "" { + log.Println("Collecting external information about K8s cluster") + if err := testData.collectExternalHostsInfo(testOptions.externalHostsConfigPath); err != nil { + log.Fatalf("Error when collecting information about external hosts: %v", err) + } + } if clusterInfo.podV4NetworkCIDR != "" { log.Printf("Pod IPv4 network: '%s'", clusterInfo.podV4NetworkCIDR) } diff --git a/test/e2e/multicast_test.go b/test/e2e/multicast_test.go index 136e9e43d6f..5eaa99ff156 100644 --- a/test/e2e/multicast_test.go +++ b/test/e2e/multicast_test.go @@ -40,6 +40,8 @@ func skipIfMulticastDisabled(tb testing.TB) { } var igmpQueryType = int32(0x11) +var externalHostIdx = 0 +var externalHostIface string func TestMulticast(t *testing.T) { skipIfHasWindowsNodes(t) @@ -56,37 +58,83 @@ func TestMulticast(t *testing.T) { if err != nil { t.Fatalf("Error computing multicast interfaces: %v", err) } - t.Run("testMulticastBetweenPodsInTwoNodes", func(t *testing.T) { + if len(externalHostInfo.hosts) != 0 { + externalHostIface, err = getMulticastExternalHostIface(externalHostIdx) + if err != nil { + t.Fatalf("Error getting external host interface: %v", err) + } + } + t.Run("testMulticastWithNoEncap", func(t *testing.T) { + runMulticastTestCases(t, data, nodeMulticastInterfaces) + }) +} + +func runMulticastTestCases(t *testing.T, data *TestData, nodeMulticastInterfaces map[int][]string) { + t.Run("testMulticastBetweenPodsInTwoNodesWithExternalHost", func(t *testing.T) { + skipIfNoExternalHosts(t) + skipIfNumNodesLessThan(t, 2) + testcases := []multicastTestcase{ + { + name: "testMulticastTrafficFromExternal", + receiverIndices: []int{0}, + externalReceiver: false, + externalSender: true, + port: 3458, + group: net.ParseIP("224.3.4.7"), + }, + { + name: "testMulticastTrafficToExternal", + receiverIndices: []int{0}, + externalReceiver: true, + externalSender: false, + port: 3459, + group: net.ParseIP("224.3.4.8"), + }, + } + for _, mc := range testcases { + mc := mc + t.Run(mc.name, func(t *testing.T) { + t.Parallel() + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) + }) + } + }) + t.Run("testMulticastBetweenPodsInThreeNodesWithExternalHost", func(t *testing.T) { + skipIfNoExternalHosts(t) + skipIfNumNodesLessThan(t, 3) + testcases := []multicastTestcase{ + { + name: "testMulticastMultipleReceiversTrafficToExternal", + receiverIndices: []int{1, 2}, + externalReceiver: true, + externalSender: false, + port: 3463, + group: net.ParseIP("224.3.4.12"), + }, + } + for _, mc := range testcases { + mc := mc + t.Run(mc.name, func(t *testing.T) { + t.Parallel() + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces) + }) + } + }) + t.Run("testMulticastBetweenPodsInTwoNodesInCluster", func(t *testing.T) { skipIfNumNodesLessThan(t, 2) testcases := []multicastTestcase{ { name: "testMulticastForLocalPods", - senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: false}, - receiverConfigs: []multicastTestPodConfig{{0, false}}, + receiverIndices: []int{0}, port: 3456, group: net.ParseIP("224.3.4.5"), }, { name: "testMulticastForInterNodePods", - senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: false}, - receiverConfigs: []multicastTestPodConfig{{1, false}}, + receiverIndices: []int{1}, port: 3457, group: net.ParseIP("224.3.4.6"), }, - { - name: "testMulticastTrafficFromExternal", - senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: true}, - receiverConfigs: []multicastTestPodConfig{{1, false}}, - port: 3458, - group: net.ParseIP("224.3.4.7"), - }, - { - name: "testMulticastTrafficToExternal", - senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: false}, - receiverConfigs: []multicastTestPodConfig{{1, true}}, - port: 3459, - group: net.ParseIP("224.3.4.8"), - }, } for _, mc := range testcases { mc := mc @@ -96,37 +144,21 @@ func TestMulticast(t *testing.T) { }) } }) - t.Run("testMulticastBetweenPodsInThreeNodes", func(t *testing.T) { + t.Run("testMulticastBetweenPodsInThreeNodesInCluster", func(t *testing.T) { skipIfNumNodesLessThan(t, 3) testcases := []multicastTestcase{ { name: "testMulticastMultipleReceiversOnSameNode", - senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: false}, - receiverConfigs: []multicastTestPodConfig{{0, false}, {0, false}}, + receiverIndices: []int{0, 0}, port: 3460, group: net.ParseIP("224.3.4.9"), }, { name: "testMulticastMultipleReceiversForInterNodePods", - senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: false}, - receiverConfigs: []multicastTestPodConfig{{1, false}, {2, false}}, + receiverIndices: []int{1, 2}, port: 3461, group: net.ParseIP("224.3.4.10"), }, - { - name: "testMulticastMultipleReceiversTrafficFromExternal", - senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: true}, - receiverConfigs: []multicastTestPodConfig{{1, false}, {2, true}}, - port: 3462, - group: net.ParseIP("224.3.4.11"), - }, - { - name: "testMulticastMultipleReceiversTrafficToExternal", - senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: false}, - receiverConfigs: []multicastTestPodConfig{{1, true}, {2, false}}, - port: 3463, - group: net.ParseIP("224.3.4.12"), - }, } for _, mc := range testcases { mc := mc @@ -264,17 +296,13 @@ func TestMulticast(t *testing.T) { }) } -type multicastTestPodConfig struct { - nodeIdx int - isHostNetwork bool -} - type multicastTestcase struct { - name string - senderConfig multicastTestPodConfig - receiverConfigs []multicastTestPodConfig - port int - group net.IP + name string + receiverIndices []int + port int + externalReceiver bool + externalSender bool + group net.IP } type multicastStatsTestcase struct { @@ -365,6 +393,8 @@ func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc mul if err != nil { t.Fatalf("Error when waiting for ANP %s to be realized: %v", np.Name, err) } + defer data.DeleteANP(data.testNamespace, np.Name) + } for _, anp := range mc.igmpANPConfigs { @@ -403,6 +433,7 @@ func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc mul if err != nil { t.Fatalf("Error when waiting for ANP %s released: %v", np.Name, err) } + defer data.DeleteANP(data.testNamespace, np.Name) } for _, receiverConfig := range mc.receiverConfigs { @@ -556,59 +587,58 @@ func testMulticastForwardToMultipleInterfaces(t *testing.T, data *TestData, send } } +// This test assumes there is only one multicast sender in the network, +// which can be a pod sender located in a node with multiple multicast interfaces(if no node has more than one external interface, it defaults to the first node) +// or a sender from an external host. func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string) { + var multipleMcastIfaceNodeIdx = 0 + for nodeIdx, iface := range nodeMulticastInterfaces { + if len(iface) > 1 { + multipleMcastIfaceNodeIdx = nodeIdx + } + } mcjoinWaitTimeout := defaultTimeout / time.Second gatewayInterface, err := data.GetGatewayInterfaceName(antreaNamespace) failOnError(err, t) - senderName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-sender-", nodeName(mc.senderConfig.nodeIdx), data.testNamespace, mc.senderConfig.isHostNetwork) - defer cleanupFunc() + var wg sync.WaitGroup _, cleanupFuncs := setupReceivers(t, data, mc, mcjoinWaitTimeout, &wg) for _, cleanupFunc := range cleanupFuncs { defer cleanupFunc() } - // Wait 2 seconds(-w 2) before sending multicast traffic. - // It sends two multicast packets for every second(-f 500 means it takes 500 milliseconds for sending one packet). - sendMulticastCommand := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -f 500 -o -p %d -s -t 3 -w 2 -W %d %s", mc.port, mcjoinWaitTimeout, mc.group.String())} - go func() { - data.RunCommandFromPod(data.testNamespace, senderName, mcjoinContainerName, sendMulticastCommand) - }() + cleanupFunc := setupSender(t, data, mc, mcjoinWaitTimeout, multipleMcastIfaceNodeIdx) + defer cleanupFunc() readyReceivers := sets.NewInt() senderReady := false if err := wait.Poll(3*time.Second, defaultTimeout, func() (bool, error) { - if !senderReady { + if !mc.externalSender && !senderReady { + _, mrouteResult, _, err := data.RunCommandOnNode(nodeName(multipleMcastIfaceNodeIdx), fmt.Sprintf("ip mroute show to %s iif %s | grep '%s'", mc.group.String(), gatewayInterface, strings.Join(nodeMulticastInterfaces[multipleMcastIfaceNodeIdx], " "))) // Sender pods should add an outbound multicast route except running as HostNetwork. - _, mrouteResult, _, err := data.RunCommandOnNode(nodeName(mc.senderConfig.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s | grep '%s'", mc.group.String(), gatewayInterface, strings.Join(nodeMulticastInterfaces[mc.senderConfig.nodeIdx], " "))) if err != nil { return false, err } - if !mc.senderConfig.isHostNetwork { - if len(mrouteResult) == 0 { - return false, nil - } - } else { - if len(mrouteResult) != 0 { - return false, nil - } + if len(mrouteResult) == 0 { + return false, nil } senderReady = true } - // Check inbound multicast route and whether multicast interfaces has joined the multicast group. - for _, receiver := range mc.receiverConfigs { - if readyReceivers.Has(receiver.nodeIdx) { + for _, receiverIdx := range mc.receiverIndices { + if readyReceivers.Has(receiverIdx) { continue } - for _, receiverMulticastInterface := range nodeMulticastInterfaces[receiver.nodeIdx] { - _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) + for _, receiverMulticastInterface := range nodeMulticastInterfaces[receiverIdx] { + + _, mRouteResult, _, err := data.RunCommandOnNode(nodeName(receiverIdx), fmt.Sprintf("ip mroute show to %s iif %s ", mc.group.String(), receiverMulticastInterface)) if err != nil { return false, err } - // If multicast traffic is sent from non-HostNetwork pods and senders-receivers are located in different nodes, - // the receivers should configure corresponding inbound multicast routes. - if mc.senderConfig.nodeIdx != receiver.nodeIdx && !receiver.isHostNetwork { + // If multicast traffic is sent from an external host, the multicast route will be configured on the node with multiple multicast interfaces + // because the multi-mcast node and external host connect to the same gateway. + // If sender-receivers are located in different nodes and sender is not external host, the receivers should configure corresponding inbound multicast routes. + if (mc.externalSender && receiverIdx == multipleMcastIfaceNodeIdx && receiverMulticastInterface == externalHostIface) || (!mc.externalSender && receiverIdx != multipleMcastIfaceNodeIdx) { if len(mRouteResult) == 0 { return false, nil } @@ -617,24 +647,18 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc return false, nil } } - _, mAddrResult, _, err := data.RunCommandOnNode(nodeName(receiver.nodeIdx), fmt.Sprintf("ip maddr show %s | grep %s", receiverMulticastInterface, mc.group.String())) + _, mAddrResult, _, err := data.RunCommandOnNode(nodeName(receiverIdx), fmt.Sprintf("ip maddr show %s | grep %s", receiverMulticastInterface, mc.group.String())) if err != nil { return false, err } // The receivers should also join multicast group. // Note that in HostNetwork mode, the "join multicast" action is taken by mcjoin, // which will not persist after mcjoin exits. - if !receiver.isHostNetwork { - if len(mAddrResult) == 0 { - return false, nil - } - } else { - if len(mAddrResult) != 0 { - return false, nil - } + if len(mAddrResult) == 0 { + return false, nil } } - readyReceivers = readyReceivers.Insert(receiver.nodeIdx) + readyReceivers = readyReceivers.Insert(receiverIdx) } return true, nil }); err != nil { @@ -643,11 +667,28 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc wg.Wait() } +func setupSender(t *testing.T, data *TestData, mc multicastTestcase, mcjoinWaitTimeout time.Duration, senderNodeIdx int) func() { + senderName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-sender-", nodeName(senderNodeIdx), data.testNamespace, false) + + go func() { + if mc.externalSender { + // Wait 2 seconds(-w 2) before sending multicast traffic. + // It sends two multicast packets for every second(-f 500 means it takes 500 milliseconds for sending one packet). + data.RunCommandOnNode(externalHostName(externalHostIdx), fmt.Sprintf("mcjoin -i %s -f 500 -o -p %d -s -t 30 -w 2 -W %d %s", externalHostIface, mc.port, mcjoinWaitTimeout, mc.group.String())) + } else { + sendMulticastCommand := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -f 500 -o -p %d -s -t 30 -w 2 -W %d %s", mc.port, mcjoinWaitTimeout, mc.group.String())} + data.RunCommandFromPod(data.testNamespace, senderName, mcjoinContainerName, sendMulticastCommand) + } + }() + + return cleanupFunc +} + func setupReceivers(t *testing.T, data *TestData, mc multicastTestcase, mcjoinWaitTimeout time.Duration, wg *sync.WaitGroup) ([]string, []func()) { receiverNames := make([]string, 0) cleanupFuncs := []func(){} - for _, receiver := range mc.receiverConfigs { - receiverName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-receiver-", nodeName(receiver.nodeIdx), data.testNamespace, receiver.isHostNetwork) + for _, receiverIdx := range mc.receiverIndices { + receiverName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-receiver-", nodeName(receiverIdx), data.testNamespace, false) receiverNames = append(receiverNames, receiverName) cleanupFuncs = append(cleanupFuncs, cleanupFunc) } @@ -665,6 +706,15 @@ func setupReceivers(t *testing.T, data *TestData, mc multicastTestcase, mcjoinWa assert.Contains(t, res, "Total: 10 packets") }() } + if mc.externalReceiver { + wg.Add(1) + go func() { + defer wg.Done() + _, res, _, err := data.RunCommandOnNode(externalHostName(externalHostIdx), fmt.Sprintf("mcjoin -i %s -c 10 -o -p %d -W %d %s", externalHostIface, mc.port, mcjoinWaitTimeout, mc.group.String())) + failOnError(err, t) + assert.Contains(t, res, "Total: 10 packets") + }() + } return receiverNames, cleanupFuncs } @@ -707,3 +757,14 @@ func checkAntctlResult(t *testing.T, data *TestData, antreaPodName, containerPod match, _ := regexp.MatchString(fmt.Sprintf("%s[[:space:]]+%s[[:space:]]+%d[[:space:]]+%d", data.testNamespace, containerPodName, inbound, outbound), strings.TrimSpace(stdout)) return match, nil } + +func getMulticastExternalHostIface(externalHostIdx int) (string, error) { + host, ok := externalHostInfo.hosts[externalHostIdx] + if !ok { + return "", fmt.Errorf("cannot find external host with index %d", externalHostIdx) + } + for _, i := range host.Interfaces { + return i.Name, nil + } + return "", fmt.Errorf("failed to get interface for external host %s", host.Name) +}