Skip to content

Commit

Permalink
Remove original exporter ip and obs domain ID from flow aggregator (#…
Browse files Browse the repository at this point in the history
…2361)

This commit removes originalExporterIPv4Address,
originalExporterIPv6Address and originalObservationDomainId from
flow aggregator and related templates for these fields.
It also bumps go-ipfix to v0.5.4.

Fixes #2336

Signed-off-by: zyiou <[email protected]>
  • Loading branch information
zyiou authored Jul 9, 2021
1 parent 1588b16 commit 4895f6a
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 125 deletions.
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ else
manifest_args="$manifest_args --no-np"
fi

COMMON_IMAGES_LIST=("gcr.io/kubernetes-e2e-test-images/agnhost:2.8" "projects.registry.vmware.com/library/busybox" "projects.registry.vmware.com/antrea/nginx" "projects.registry.vmware.com/antrea/perftool" "projects.registry.vmware.com/antrea/ipfix-collector:v0.5.3")
COMMON_IMAGES_LIST=("gcr.io/kubernetes-e2e-test-images/agnhost:2.8" "projects.registry.vmware.com/library/busybox" "projects.registry.vmware.com/antrea/nginx" "projects.registry.vmware.com/antrea/perftool" "projects.registry.vmware.com/antrea/ipfix-collector:v0.5.4")
for image in "${COMMON_IMAGES_LIST[@]}"; do
for i in `seq 3`; do
docker pull $image && break
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/ti-mo/conntrack v0.3.0
github.com/vishvananda/netlink v1.1.0
github.com/vmware/go-ipfix v0.5.3
github.com/vmware/go-ipfix v0.5.4
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/mod v0.4.2
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -625,8 +625,8 @@ github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYp
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vmware/go-ipfix v0.5.3 h1:ZJTn5vQd6W0WWt05gm+nNFjjgbgVfUkvywdxKmWj4uM=
github.com/vmware/go-ipfix v0.5.3/go.mod h1:SF6BrZTPvoVdzgmjJvshoegBVbicn4xWlkoCNADab6E=
github.com/vmware/go-ipfix v0.5.4 h1:n7TssKO8D4E3qpFmO6eDs7yU9Mr4fSbLFC+GstPU8kw=
github.com/vmware/go-ipfix v0.5.4/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY=
github.com/wenyingd/ofnet v0.0.0-20210526054554-3e71e19fd0cf h1:EEGpnM6W07pq2nKdqk+lig1Qit5f8eUe+Vt1ditTLgk=
github.com/wenyingd/ofnet v0.0.0-20210526054554-3e71e19fd0cf/go.mod h1:tZiqxY3POhek8GrqcmU+5bvVzDwY1zZ7Wh9+zwaoV3s=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
Expand Down Expand Up @@ -945,8 +945,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
87 changes: 18 additions & 69 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,8 @@ var (
"tcpState",
"flowType",
}
antreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
antreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)
aggregatorElementsCommon = []string{
"originalObservationDomainId",
}
aggregatorElementsIPv4 = append([]string{"originalExporterIPv4Address"}, aggregatorElementsCommon...)
aggregatorElementsIPv6 = append([]string{"originalExporterIPv6Address"}, aggregatorElementsCommon...)
antreaInfoElementsIPv4 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv4"}...)
antreaInfoElementsIPv6 = append(antreaInfoElementsCommon, []string{"destinationClusterIPv6"}...)

nonStatsElementList = []string{
"flowEndSeconds",
Expand Down Expand Up @@ -182,10 +177,8 @@ type flowAggregator struct {
activeFlowRecordTimeout time.Duration
inactiveFlowRecordTimeout time.Duration
exportingProcess ipfix.IPFIXExportingProcess
templateIDv4Expv4 uint16
templateIDv4Expv6 uint16
templateIDv6Expv4 uint16
templateIDv6Expv6 uint16
templateIDv4 uint16
templateIDv6 uint16
registry ipfix.IPFIXRegistry
set ipfixentities.Set
flowAggregatorAddress string
Expand Down Expand Up @@ -305,28 +298,20 @@ func (fa *flowAggregator) InitAggregationProcess() error {
return err
}

func (fa *flowAggregator) createAndSendTemplate(isRecordIPv6, isOriginExporterIPv6 bool) (uint16, error) {
func (fa *flowAggregator) createAndSendTemplate(isRecordIPv6 bool) (uint16, error) {
templateID := fa.exportingProcess.NewTemplateID()
// If Pod IPs (source and destination IP) in the flow record belong to IPv4 Family and
// original exporter IP belongs to IPv6 family, we will send template with ID templateIDv4Expv6,
// which has sourceIPv4Address, destinationIPv4Address and originalExporterIPv6Address.
// Same applies to other combinations.
recordIPFamily := "IPv4"
exporterIPFamily := "IPv4"
if isRecordIPv6 {
recordIPFamily = "IPv6"
}
if isOriginExporterIPv6 {
exporterIPFamily = "IPv6"
}
bytesSent, err := fa.sendTemplateSet(isRecordIPv6, isOriginExporterIPv6)
bytesSent, err := fa.sendTemplateSet(isRecordIPv6)
if err != nil {
fa.exportingProcess.CloseConnToCollector()
fa.exportingProcess = nil
fa.set.ResetSet()
return 0, fmt.Errorf("sending %s template set with %s original exporter ip failed, err: %v", recordIPFamily, exporterIPFamily, err)
return 0, fmt.Errorf("sending %s template set failed, err: %v", recordIPFamily, err)
}
klog.V(2).InfoS("Exporting process initialized", "bytesSent", bytesSent, "templateSetIPFamily", recordIPFamily, "originalExporterIPFamily", exporterIPFamily)
klog.V(2).InfoS("Exporting process initialized", "bytesSent", bytesSent, "templateSetIPFamily", recordIPFamily)
return templateID, nil
}

Expand Down Expand Up @@ -360,18 +345,11 @@ func (fa *flowAggregator) initExportingProcess() error {
return fmt.Errorf("got error when initializing IPFIX exporting process: %v", err)
}
fa.exportingProcess = ep
// Currently, we send 4 templates for covering all the cases in dual-stack clusters, where Pod IPs
// and original exporter IP could belong to different IP families.
if fa.templateIDv4Expv4, err = fa.createAndSendTemplate(false, false); err != nil {
// Currently, we send two templates for IPv4 and IPv6 regardless of the IP families supported by cluster
if fa.templateIDv4, err = fa.createAndSendTemplate(false); err != nil {
return err
}
if fa.templateIDv4Expv6, err = fa.createAndSendTemplate(false, true); err != nil {
return err
}
if fa.templateIDv6Expv4, err = fa.createAndSendTemplate(true, false); err != nil {
return err
}
if fa.templateIDv6Expv6, err = fa.createAndSendTemplate(true, true); err != nil {
if fa.templateIDv6, err = fa.createAndSendTemplate(true); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -427,20 +405,9 @@ func (fa *flowAggregator) flowRecordExpiryCheck(stopCh <-chan struct{}) {

func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, record *ipfixintermediate.AggregationFlowRecord) error {
isRecordIPv4 := fa.aggregationProcess.IsAggregatedRecordIPv4(*record)
isOriginExporterIPv4 := fa.aggregationProcess.IsExporterOfAggregatedRecordIPv4(*record)
var templateID uint16
if isRecordIPv4 {
if isOriginExporterIPv4 {
templateID = fa.templateIDv4Expv4
} else {
templateID = fa.templateIDv4Expv6
}
} else {
if isOriginExporterIPv4 {
templateID = fa.templateIDv6Expv4
} else {
templateID = fa.templateIDv6Expv6
}
templateID := fa.templateIDv4
if !isRecordIPv4 {
templateID = fa.templateIDv6
}
// TODO: more records per data set will be supported when go-ipfix supports size check when adding records
fa.set.ResetSet()
Expand Down Expand Up @@ -471,25 +438,15 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
return nil
}

func (fa *flowAggregator) sendTemplateSet(isFlowKeyIPv6 bool, isOriginalExporterIPv6 bool) (int, error) {
func (fa *flowAggregator) sendTemplateSet(isIPv6 bool) (int, error) {
elements := make([]*ipfixentities.InfoElementWithValue, 0)
ianaInfoElements := ianaInfoElementsIPv4
antreaInfoElements := antreaInfoElementsIPv4
aggregatorElements := aggregatorElementsIPv4
templateID := fa.templateIDv4Expv4
if isOriginalExporterIPv6 {
aggregatorElements = aggregatorElementsIPv6
templateID = fa.templateIDv4Expv6
}
if isFlowKeyIPv6 {
templateID := fa.templateIDv4
if isIPv6 {
ianaInfoElements = ianaInfoElementsIPv6
antreaInfoElements = antreaInfoElementsIPv6
if isOriginalExporterIPv6 {
aggregatorElements = aggregatorElementsIPv6
templateID = fa.templateIDv6Expv6
} else {
templateID = fa.templateIDv6Expv4
}
templateID = fa.templateIDv6
}
for _, ie := range ianaInfoElements {
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID)
Expand All @@ -515,14 +472,6 @@ func (fa *flowAggregator) sendTemplateSet(isFlowKeyIPv6 bool, isOriginalExporter
ie := ipfixentities.NewInfoElementWithValue(element, nil)
elements = append(elements, ie)
}
for _, ie := range aggregatorElements {
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID)
if err != nil {
return 0, fmt.Errorf("%s not present. returned error: %v", ie, err)
}
ie := ipfixentities.NewInfoElementWithValue(element, nil)
elements = append(elements, ie)
}
for _, ie := range antreaSourceStatsElementList {
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID)
if err != nil {
Expand Down
41 changes: 14 additions & 27 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ import (
)

const (
testTemplateIDv4Expv4 = uint16(256)
testTemplateIDv4Expv6 = uint16(257)
testTemplateIDv6Expv4 = uint16(258)
testTemplateIDv6Expv6 = uint16(259)
testTemplateIDv4 = uint16(256)
testTemplateIDv6 = uint16(257)
testActiveTimeout = 60 * time.Second
testInactiveTimeout = 180 * time.Second
testObservationDomainID = 0xabcd
Expand Down Expand Up @@ -63,10 +61,8 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
activeFlowRecordTimeout: testActiveTimeout,
inactiveFlowRecordTimeout: testInactiveTimeout,
exportingProcess: mockIPFIXExpProc,
templateIDv4Expv4: testTemplateIDv4Expv4,
templateIDv4Expv6: testTemplateIDv4Expv6,
templateIDv6Expv4: testTemplateIDv6Expv4,
templateIDv6Expv6: testTemplateIDv6Expv6,
templateIDv4: testTemplateIDv4,
templateIDv6: testTemplateIDv6,
registry: mockIPFIXRegistry,
set: mockDataSet,
flowAggregatorAddress: "",
Expand Down Expand Up @@ -115,9 +111,9 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
}

for _, tc := range testcases {
templateID := fa.templateIDv4Expv4
templateID := fa.templateIDv4
if tc.isIPv6 {
templateID = fa.templateIDv6Expv6
templateID = fa.templateIDv6
}
mockDataSet.EXPECT().ResetSet()
mockDataSet.EXPECT().PrepareSet(ipfixentities.Data, templateID).Return(nil)
Expand All @@ -140,7 +136,6 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
destinationPodLabelsIE := ipfixentities.NewInfoElementWithValue(destinationPodLabelsElement, bytes.NewBufferString("").Bytes())
mockRecord.EXPECT().AddInfoElement(destinationPodLabelsIE).Return(nil)
mockAggregationProcess.EXPECT().SetExternalFieldsFilled(tc.flowRecord)
mockAggregationProcess.EXPECT().IsExporterOfAggregatedRecordIPv4(*tc.flowRecord).Return(!tc.isIPv6)
mockAggregationProcess.EXPECT().IsAggregatedRecordIPv4(*tc.flowRecord).Return(!tc.isIPv6)

err := fa.sendFlowKeyRecord(tc.flowKey, tc.flowRecord)
Expand All @@ -164,10 +159,8 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) {
aggregationProcess: nil,
activeFlowRecordTimeout: testActiveTimeout,
exportingProcess: mockIPFIXExpProc,
templateIDv4Expv4: testTemplateIDv4Expv4,
templateIDv4Expv6: testTemplateIDv4Expv6,
templateIDv6Expv4: testTemplateIDv6Expv4,
templateIDv6Expv6: testTemplateIDv6Expv6,
templateIDv4: testTemplateIDv4,
templateIDv6: testTemplateIDv6,
registry: mockIPFIXRegistry,
set: mockTempSet,
flowAggregatorAddress: "",
Expand All @@ -178,13 +171,11 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) {
for _, isIPv6 := range []bool{false, true} {
ianaInfoElements := ianaInfoElementsIPv4
antreaInfoElements := antreaInfoElementsIPv4
aggregatorElements := aggregatorElementsIPv4
testTemplateID := fa.templateIDv4Expv4
testTemplateID := fa.templateIDv4
if isIPv6 {
ianaInfoElements = ianaInfoElementsIPv6
antreaInfoElements = antreaInfoElementsIPv6
aggregatorElements = aggregatorElementsIPv6
testTemplateID = fa.templateIDv6Expv6
testTemplateID = fa.templateIDv6
}
// Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals)
// Only the element name is needed, other arguments have dummy values.
Expand All @@ -201,21 +192,17 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].Element, nil)
}
for i, ie := range aggregatorElements {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.IANAEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].Element, nil)
}
for i, ie := range antreaSourceStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)].Element, nil)
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].Element, nil)
}
for i, ie := range antreaDestinationStatsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)+len(antreaSourceStatsElementList)].Element, nil)
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(antreaSourceStatsElementList)].Element, nil)
}
for i, ie := range antreaLabelsElementList {
elemList = append(elemList, ipfixentities.NewInfoElementWithValue(ipfixentities.NewInfoElement(ie, 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil))
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(aggregatorElements)+len(antreaSourceStatsElementList)+len(antreaDestinationStatsElementList)].Element, nil)
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(antreaSourceStatsElementList)+len(antreaDestinationStatsElementList)].Element, nil)
}
mockTempSet.EXPECT().ResetSet()
mockTempSet.EXPECT().PrepareSet(ipfixentities.Template, testTemplateID).Return(nil)
Expand All @@ -224,7 +211,7 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) {
// above elements: ianaInfoElements, ianaReverseInfoElements and antreaInfoElements.
mockIPFIXExpProc.EXPECT().SendSet(mockTempSet).Return(0, nil)

_, err := fa.sendTemplateSet(isIPv6, isIPv6)
_, err := fa.sendTemplateSet(isIPv6)
assert.NoErrorf(t, err, "Error in sending template record: %v, isIPv6: %v", err, isIPv6)
}
}
5 changes: 0 additions & 5 deletions pkg/ipfix/ipfix_intermediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type IPFIXAggregationProcess interface {
SetCorrelatedFieldsFilled(record *ipfixintermediate.AggregationFlowRecord)
AreCorrelatedFieldsFilled(record ipfixintermediate.AggregationFlowRecord) bool
IsAggregatedRecordIPv4(record ipfixintermediate.AggregationFlowRecord) bool
IsExporterOfAggregatedRecordIPv4(record ipfixintermediate.AggregationFlowRecord) bool
SetExternalFieldsFilled(record *ipfixintermediate.AggregationFlowRecord)
AreExternalFieldsFilled(record ipfixintermediate.AggregationFlowRecord) bool
}
Expand Down Expand Up @@ -87,10 +86,6 @@ func (ap *ipfixAggregationProcess) IsAggregatedRecordIPv4(record ipfixintermedia
return ap.AggregationProcess.IsAggregatedRecordIPv4(record)
}

func (ap *ipfixAggregationProcess) IsExporterOfAggregatedRecordIPv4(record ipfixintermediate.AggregationFlowRecord) bool {
return ap.AggregationProcess.IsExporterOfAggregatedRecordIPv4(record)
}

func (ap *ipfixAggregationProcess) SetExternalFieldsFilled(record *ipfixintermediate.AggregationFlowRecord) {
ap.AggregationProcess.SetExternalFieldsFilled(record, true)
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/ipfix/testing/mock_ipfix.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4895f6a

Please sign in to comment.