Skip to content

Commit

Permalink
Fix Flow Aggregator e2e test failure because of time difference
Browse files Browse the repository at this point in the history
In this PR, we change to comparing flow export time with flow start time
in flow record instead of the test start time of the host running tests.
Fix issue #2980

Signed-off-by: Yongming Ding <[email protected]>
  • Loading branch information
Yongming Ding committed Nov 5, 2021
1 parent 0feed23 commit e0b68d4
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,6 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs
}

func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP string, isIPv6 bool, isIntraNode bool, checkService bool, checkK8sNetworkPolicy bool, checkAntreaNetworkPolicy bool, checkBandwidth bool) {
timeStart := time.Now()
timeStartSec := timeStart.Unix()
var cmdStr string
if !isIPv6 {
cmdStr = fmt.Sprintf("iperf3 -c %s -t %d", dstIP, iperfTimeSec)
Expand All @@ -528,7 +526,7 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
t.Fatalf("Unit of the traffic bandwidth reported by iperf should either be Mbits or Gbits, failing the test.")
}

collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, timeStart, checkService, true, isIPv6)
collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6)
// Iterate over recordSlices and build some results to test with expected results
dataRecordsCount := 0
var octetTotalCount uint64
Expand Down Expand Up @@ -582,13 +580,14 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
if checkBandwidth && !strings.Contains(record, "octetDeltaCount: 0") {
exportTime := int64(getUnit64FieldFromRecord(t, record, "flowEndSeconds"))
curOctetTotalCount := getUnit64FieldFromRecord(t, record, "octetTotalCountFromSourceNode")
flowStartTime := int64(getUnit64FieldFromRecord(t, record, "flowStartSeconds"))
if curOctetTotalCount > octetTotalCount {
octetTotalCount = curOctetTotalCount
}
curOctetDeltaCount := getUnit64FieldFromRecord(t, record, "octetDeltaCountFromSourceNode")
// Check the bandwidth using octetDeltaCountFromSourceNode, if this record
// is neither the first record nor the last in the stream of records.
if curOctetDeltaCount != curOctetTotalCount && exportTime < timeStartSec+iperfTimeSec {
if curOctetDeltaCount != curOctetTotalCount && exportTime < flowStartTime+iperfTimeSec {
t.Logf("Check the bandwidth using octetDeltaCountFromSourceNode %d in data record.", curOctetDeltaCount)
// This middle record should aggregate two records from Flow Exporter
checkBandwidthByInterval(t, bandwidthInMbps, curOctetDeltaCount, float64(2*exporterActiveFlowExportTimeout/time.Second), "octetDeltaCountFromSourceNode")
Expand All @@ -607,7 +606,6 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
}

func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool) {
timeStart := time.Now()
var cmd string
if !isIPv6 {
cmd = fmt.Sprintf("wget -O- %s:%d", dstIP, dstPort)
Expand All @@ -617,7 +615,7 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st
stdout, stderr, err := data.runCommandFromPod(testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd))
require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr)

_, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", timeStart, false, false, isIPv6)
_, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6)
for _, record := range recordSlices {
if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) {
checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "")
Expand All @@ -632,7 +630,6 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st
}

func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6 bool, isIntraNode bool, isANP bool) {
timeStart := time.Now()
var cmdStr1, cmdStr2 string
if !isIPv6 {
cmdStr1 = fmt.Sprintf("iperf3 -c %s -n 1", testFlow1.dstIP)
Expand All @@ -646,8 +643,8 @@ func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2
_, _, err = data.runCommandFromPod(testNamespace, testFlow2.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr2})
assert.Error(t, err)

_, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", timeStart, false, false, isIPv6)
_, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", timeStart, false, false, isIPv6)
_, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6)
_, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6)
recordSlices := append(recordSlices1, recordSlices2...)
src_flow1, dst_flow1 := matchSrcAndDstAddress(testFlow1.srcIP, testFlow1.dstIP, false, isIPv6)
src_flow2, dst_flow2 := matchSrcAndDstAddress(testFlow2.srcIP, testFlow2.dstIP, false, isIPv6)
Expand Down Expand Up @@ -758,7 +755,7 @@ func getUnit64FieldFromRecord(t *testing.T, record string, field string) uint64
// received all the expected records for a given flow with source IP, destination IP
// and source port. We send source port to ignore the control flows during the
// iperf test.
func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, timeStart time.Time, isDstService bool, checkAllRecords bool, isIPv6 bool) (string, []string) {
func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService bool, checkAllRecords bool, isIPv6 bool) (string, []string) {
var collectorOutput string
var recordSlices []string
err := wait.PollImmediate(500*time.Millisecond, aggregatorInactiveFlowRecordTimeout, func() (bool, error) {
Expand All @@ -774,9 +771,10 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, timeStart ti
src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6)
if checkAllRecords {
for _, record := range recordSlices {
flowStartTime := int64(getUnit64FieldFromRecord(t, record, "flowStartSeconds"))
exportTime := int64(getUnit64FieldFromRecord(t, record, "flowEndSeconds"))
if strings.Contains(record, src) && strings.Contains(record, dst) && strings.Contains(record, srcPort) {
if exportTime >= timeStart.Unix()+iperfTimeSec {
if exportTime >= flowStartTime+iperfTimeSec {
return true, nil
}
}
Expand All @@ -785,7 +783,7 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, timeStart ti
}
return strings.Contains(collectorOutput, src) && strings.Contains(collectorOutput, dst) && strings.Contains(collectorOutput, srcPort), nil
})
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector output: %v time start: %s iperf source port: %s", collectorOutput, timeStart.String(), srcPort)
require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector output: %v iperf source port: %s", collectorOutput, srcPort)
return collectorOutput, recordSlices
}

Expand Down

0 comments on commit e0b68d4

Please sign in to comment.