Skip to content

Commit

Permalink
Clear original export fields in the aggregation process
Browse files Browse the repository at this point in the history
Original export fields should be removed in the intermediate process
in their current state.
In cases, where correlated flow records are there, current original export
fields do not make sense as records can come from two different
exporters.

More details in an Antrea issue, who is main user of go-ipfix library:
antrea-io/antrea#2336
  • Loading branch information
srikartati committed Jul 2, 2021
1 parent 843354a commit 68560fc
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 112 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.6.1
google.golang.org/protobuf v1.26.0
google.golang.org/protobuf v1.27.1
k8s.io/apimachinery v0.21.0
k8s.io/component-base v0.21.0
k8s.io/klog/v2 v2.8.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
73 changes: 4 additions & 69 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,7 @@ func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) er
if set.GetSetType() != entities.Data { // only process data records
return nil
}
isExporterIPv4, err := addOriginalExporterInfo(message)
if err != nil {
return err
}

records := set.GetRecords()
invalidRecs := 0
for _, record := range records {
Expand All @@ -151,7 +148,7 @@ func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) er
if err != nil {
return err
}
if err = a.addOrUpdateRecordInMap(flowKey, record, isIPv4, isExporterIPv4); err != nil {
if err = a.addOrUpdateRecordInMap(flowKey, record, isIPv4); err != nil {
return err
}
}
Expand Down Expand Up @@ -286,13 +283,9 @@ func (a *AggregationProcess) IsAggregatedRecordIPv4(record AggregationFlowRecord
return record.isIPv4
}

func (a *AggregationProcess) IsExporterOfAggregatedRecordIPv4(record AggregationFlowRecord) bool {
return record.isExporterIPv4
}

// addOrUpdateRecordInMap either adds the record to flowKeyMap or updates the record in
// flowKeyMap by doing correlation or updating the stats.
func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record entities.Record, isIPv4, isExporterIPv4 bool) error {
func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record entities.Record, isIPv4 bool) error {
a.mutex.Lock()
defer a.mutex.Unlock()

Expand Down Expand Up @@ -359,8 +352,8 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent
ReadyToSend: false,
waitForReadyToSendRetries: 0,
isIPv4: isIPv4,
isExporterIPv4: isExporterIPv4,
}

if !correlationRequired {
aggregationRecord.ReadyToSend = true
// If no correlation is required for an Inter-Node record, K8s metadata is
Expand Down Expand Up @@ -742,64 +735,6 @@ func getFlowKeyFromRecord(record entities.Record) (*FlowKey, bool, error) {
return flowKey, isSrcIPv4Filled && isDstIPv4Filled, nil
}

// addOriginalExporterInfo adds originalExporterIP and originalObservationDomainId
// to records in message set. It returns whether exportIP is IPv4(true) or IPv6(false).
func addOriginalExporterInfo(message *entities.Message) (bool, error) {
isIPv4 := false
exporterIP := net.ParseIP(message.GetExportAddress())
if exporterIP.To4() != nil {
isIPv4 = true
}
set := message.GetSet()
records := set.GetRecords()
for _, record := range records {
var originalExporterIP, originalObservationDomainId *entities.InfoElementWithValue
var ie *entities.InfoElement
var err error
// Add originalExporterIP. Supports both IPv4 and IPv6.
if isIPv4 {
ie, err = registry.GetInfoElement("originalExporterIPv4Address", registry.IANAEnterpriseID)
} else {
ie, err = registry.GetInfoElement("originalExporterIPv6Address", registry.IANAEnterpriseID)
}
if err != nil {
return isIPv4, err
}

var value []byte
if isIPv4 {
value, err = entities.EncodeToIEDataType(entities.Ipv4Address, net.ParseIP(message.GetExportAddress()).To4())
} else {
value, err = entities.EncodeToIEDataType(entities.Ipv6Address, net.ParseIP(message.GetExportAddress()).To16())
}
if err != nil {
return isIPv4, fmt.Errorf("error when encoding originalExporterIP: %v", err)
}
originalExporterIP = entities.NewInfoElementWithValue(ie, value)

err = record.AddInfoElement(originalExporterIP)
if err != nil {
return isIPv4, err
}

// Add originalObservationDomainId
ie, err = registry.GetInfoElement("originalObservationDomainId", registry.IANAEnterpriseID)
if err != nil {
return isIPv4, fmt.Errorf("IANA Registry is not loaded correctly with originalObservationDomainId")
}
value, err = entities.EncodeToIEDataType(entities.Unsigned32, message.GetObsDomainID())
if err != nil {
return isIPv4, fmt.Errorf("error when encoding originalObservationDomainId: %v", err)
}
originalObservationDomainId = entities.NewInfoElementWithValue(ie, value)
err = record.AddInfoElement(originalObservationDomainId)
if err != nil {
return isIPv4, err
}
}
return isIPv4, nil
}

func validateDataRecord(record entities.Record) bool {
for _, element := range record.GetOrderedElementList() {
if element.Value == nil {
Expand Down
45 changes: 8 additions & 37 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,35 +487,6 @@ func TestAggregationProcess(t *testing.T) {
assert.Equalf(t, aggRecord.Record, dataMsg.GetSet().GetRecords()[0], "records should be equal")
}

func TestAddOriginalExporterInfo(t *testing.T) {
message := createDataMsgForSrc(t, false, false, false, false, false)
isIPv4, err := addOriginalExporterInfo(message)
assert.NoError(t, err)
assert.True(t, isIPv4)
record := message.GetSet().GetRecords()[0]
ieWithValue, exist := record.GetInfoElementWithValue("originalExporterIPv4Address")
assert.Equal(t, true, exist)
assert.Equal(t, net.IP{0x7f, 0x0, 0x0, 0x1}, ieWithValue.Value)
ieWithValue, exist = record.GetInfoElementWithValue("originalObservationDomainId")
assert.Equal(t, true, exist)
assert.Equal(t, uint32(1234), ieWithValue.Value)
}

func TestAddOriginalExporterInfoIPv6(t *testing.T) {
// Test message with data set
message := createDataMsgForSrc(t, true, false, false, false, false)
isIPv4, err := addOriginalExporterInfo(message)
assert.NoError(t, err)
assert.False(t, isIPv4)
record := message.GetSet().GetRecords()[0]
ieWithValue, exist := record.GetInfoElementWithValue("originalExporterIPv6Address")
assert.Equal(t, true, exist)
assert.Equal(t, net.IP{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, ieWithValue.Value)
ieWithValue, exist = record.GetInfoElementWithValue("originalObservationDomainId")
assert.Equal(t, true, exist)
assert.Equal(t, uint32(1234), ieWithValue.Value)
}

func TestCorrelateRecordsForInterNodeFlow(t *testing.T) {
messageChan := make(chan *entities.Message)
input := AggregationInput{
Expand Down Expand Up @@ -729,7 +700,7 @@ func TestGetExpiryFromExpirePriorityQueue(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
for _, record := range tc.records {
flowKey, isIPv4, _ := getFlowKeyFromRecord(record)
err := ap.addOrUpdateRecordInMap(flowKey, record, isIPv4, true)
err := ap.addOrUpdateRecordInMap(flowKey, record, isIPv4)
assert.NoError(t, err)
}
expiryTime := ap.GetExpiryFromExpirePriorityQueue()
Expand Down Expand Up @@ -809,7 +780,7 @@ func TestForAllExpiredFlowRecordsDo(t *testing.T) {
numExecutions = 0
for _, record := range tc.records {
flowKey, isIPv4, _ := getFlowKeyFromRecord(record)
err := ap.addOrUpdateRecordInMap(flowKey, record, isIPv4, true)
err := ap.addOrUpdateRecordInMap(flowKey, record, isIPv4)
assert.NoError(t, err)
}
switch tc.name {
Expand Down Expand Up @@ -845,15 +816,15 @@ func TestForAllExpiredFlowRecordsDo(t *testing.T) {

func runCorrelationAndCheckResult(t *testing.T, ap *AggregationProcess, record1, record2 entities.Record, isIPv6, isIntraNode, needsCorrleation bool) {
flowKey1, isIPv4, _ := getFlowKeyFromRecord(record1)
err := ap.addOrUpdateRecordInMap(flowKey1, record1, isIPv4, true)
err := ap.addOrUpdateRecordInMap(flowKey1, record1, isIPv4)
assert.NoError(t, err)
item := ap.expirePriorityQueue.Peek()
oldActiveExpiryTime := item.activeExpireTime
oldInactiveExpiryTime := item.inactiveExpireTime
if !isIntraNode && needsCorrleation {
flowKey2, isIPv4, _ := getFlowKeyFromRecord(record2)
assert.Equalf(t, *flowKey1, *flowKey2, "flow keys should be equal.")
err = ap.addOrUpdateRecordInMap(flowKey2, record2, isIPv4, true)
err = ap.addOrUpdateRecordInMap(flowKey2, record2, isIPv4)
assert.NoError(t, err)
}
assert.Equal(t, 1, len(ap.flowKeyRecordMap))
Expand Down Expand Up @@ -897,20 +868,20 @@ func runCorrelationAndCheckResult(t *testing.T, ap *AggregationProcess, record1,

func runAggregationAndCheckResult(t *testing.T, ap *AggregationProcess, srcRecord, dstRecord, srcRecordLatest, dstRecordLatest entities.Record, isIntraNode bool) {
flowKey, isIPv4, _ := getFlowKeyFromRecord(srcRecord)
err := ap.addOrUpdateRecordInMap(flowKey, srcRecord, isIPv4, true)
err := ap.addOrUpdateRecordInMap(flowKey, srcRecord, isIPv4)
assert.NoError(t, err)
item := ap.expirePriorityQueue.Peek()
oldActiveExpiryTime := item.activeExpireTime
oldInactiveExpiryTime := item.inactiveExpireTime

if !isIntraNode {
err = ap.addOrUpdateRecordInMap(flowKey, dstRecord, isIPv4, true)
err = ap.addOrUpdateRecordInMap(flowKey, dstRecord, isIPv4)
assert.NoError(t, err)
}
err = ap.addOrUpdateRecordInMap(flowKey, srcRecordLatest, isIPv4, true)
err = ap.addOrUpdateRecordInMap(flowKey, srcRecordLatest, isIPv4)
assert.NoError(t, err)
if !isIntraNode {
err = ap.addOrUpdateRecordInMap(flowKey, dstRecordLatest, isIPv4, true)
err = ap.addOrUpdateRecordInMap(flowKey, dstRecordLatest, isIPv4)
assert.NoError(t, err)
}
assert.Equal(t, 1, len(ap.flowKeyRecordMap))
Expand Down
3 changes: 0 additions & 3 deletions pkg/intermediate/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ type AggregationFlowRecord struct {
// isIPv4 indicates whether the source and destination addresses are IPv4 or
// IPv6 in the aggregated flow record.
isIPv4 bool
// isExporterIPv4 indicates whether the exporter address of the received flow
// aggregator is IPv4 or IPv6.
isExporterIPv4 bool
}

type AggregationElements struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/producer/protobuf/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/test/collector_intermediate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func testCollectorToIntermediate(t *testing.T, address net.Addr, isIPv6 bool) {
assert.NotNil(t, flowKeyRecordMap[flowKey1])
record = flowKeyRecordMap[flowKey1].Record
}
assert.Equal(t, 27, len(record.GetOrderedElementList()))
assert.Equal(t, 25, len(record.GetOrderedElementList()))
for _, element := range record.GetOrderedElementList() {
switch element.Element.Name {
case "sourcePodName":
Expand Down

0 comments on commit 68560fc

Please sign in to comment.