diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index 6fe1b6ec51d..6db2ec0e7d2 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -90,7 +90,7 @@ const ( egressIP, appProtocolName, httpVals, - egressNodeName) + egressNodeName) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` diff --git a/pkg/flowaggregator/exporter/ipfix.go b/pkg/flowaggregator/exporter/ipfix.go index 4601632ea60..91d2a17a087 100644 --- a/pkg/flowaggregator/exporter/ipfix.go +++ b/pkg/flowaggregator/exporter/ipfix.go @@ -175,7 +175,9 @@ func (e *IPFIXExporter) sendRecord(record ipfixentities.Record, isRecordIPv6 boo if err != nil { return err } - klog.V(4).InfoS("Data set sent successfully", "bytes sent", sentBytes) + if klog.V(7).Enabled() { + klog.InfoS("Data set sent successfully", "bytes sent", sentBytes) + } return nil } diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index a5f4fba4365..7f7c1155d27 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -104,6 +104,7 @@ type flowAggregator struct { clusterUUID uuid.UUID aggregatorTransportProtocol flowaggregatorconfig.AggregatorTransportProtocol collectingProcess ipfix.IPFIXCollectingProcess + preprocessor *preprocessor aggregationProcess ipfix.IPFIXAggregationProcess activeFlowRecordTimeout time.Duration inactiveFlowRecordTimeout time.Duration @@ -175,13 +176,15 @@ func NewFlowAggregator( APIServer: opt.Config.APIServer, logTickerDuration: time.Minute, } - err = fa.InitCollectingProcess() - if err != nil { - return nil, fmt.Errorf("error when creating collecting process: %v", err) + if err := fa.InitCollectingProcess(); err != nil { + return nil, fmt.Errorf("error when creating collecting process: %w", err) } - err = fa.InitAggregationProcess() - if err != nil { - return nil, fmt.Errorf("error when creating aggregation process: %v", err) + recordCh := make(chan ipfixentities.Record) + if err := fa.InitPreprocessor(recordCh); err != nil { + return nil, fmt.Errorf("error when creating preprocessor: %w", err) + } + if err := fa.InitAggregationProcess(recordCh); err != nil { + return nil, fmt.Errorf("error when creating aggregation process: %w", err) } if opt.Config.ClickHouse.Enable { var err error @@ -261,15 +264,72 @@ func (fa *flowAggregator) InitCollectingProcess() error { len(infoelements.AntreaFlowEndSecondsElementList) + len(infoelements.AntreaThroughputElementList) + len(infoelements.AntreaSourceThroughputElementList) + len(infoelements.AntreaDestinationThroughputElementList) // clusterId cpInput.NumExtraElements += 1 + // Tell the collector to accept IEs which are not part of the IPFIX registry (hardcoded in + // the go-ipfix library). The preprocessor will take care of removing these elements. + cpInput.DecodingMode = collector.DecodingModeLenientKeepUnknown var err error fa.collectingProcess, err = collector.InitCollectingProcess(cpInput) return err } -func (fa *flowAggregator) InitAggregationProcess() error { +func (fa *flowAggregator) InitPreprocessor(recordCh chan<- ipfixentities.Record) error { + getInfoElementFromRegistry := func(ieName string, enterpriseID uint32) (*ipfixentities.InfoElement, error) { + ie, err := fa.registry.GetInfoElement(ieName, enterpriseID) + if err != nil { + return nil, fmt.Errorf("error when looking up IE %q in registry: %w", ieName, err) + } + return ie, err + } + + getInfoElements := func(isIPv4 bool) ([]*ipfixentities.InfoElement, error) { + ianaInfoElements := infoelements.IANAInfoElementsIPv4 + ianaReverseInfoElements := infoelements.IANAReverseInfoElements + antreaInfoElements := infoelements.AntreaInfoElementsIPv4 + if !isIPv4 { + ianaInfoElements = infoelements.IANAInfoElementsIPv6 + antreaInfoElements = infoelements.AntreaInfoElementsIPv6 + } + infoElements := make([]*ipfixentities.InfoElement, 0) + for _, ieName := range ianaInfoElements { + ie, err := getInfoElementFromRegistry(ieName, ipfixregistry.IANAEnterpriseID) + if err != nil { + return nil, err + } + infoElements = append(infoElements, ie) + } + for _, ieName := range ianaReverseInfoElements { + ie, err := getInfoElementFromRegistry(ieName, ipfixregistry.IANAReversedEnterpriseID) + if err != nil { + return nil, err + } + infoElements = append(infoElements, ie) + } + for _, ieName := range antreaInfoElements { + ie, err := getInfoElementFromRegistry(ieName, ipfixregistry.AntreaEnterpriseID) + if err != nil { + return nil, err + } + infoElements = append(infoElements, ie) + } + return infoElements, nil + } + + infoElementsIPv4, err := getInfoElements(true) + if err != nil { + return err + } + infoElementsIPv6, err := getInfoElements(false) + if err != nil { + return err + } + fa.preprocessor, err = newPreprocessor(infoElementsIPv4, infoElementsIPv6, fa.collectingProcess.GetMsgChan(), recordCh) + return err +} + +func (fa *flowAggregator) InitAggregationProcess(recordCh <-chan ipfixentities.Record) error { var err error apInput := ipfixintermediate.AggregationInput{ - MessageChan: fa.collectingProcess.GetMsgChan(), + RecordChan: recordCh, WorkerNum: aggregationWorkerNum, CorrelateFields: correlateFields, ActiveExpiryTimeout: fa.activeFlowRecordTimeout, @@ -293,6 +353,11 @@ func (fa *flowAggregator) Run(stopCh <-chan struct{}) { fa.collectingProcess.Start() }() ipfixProcessesWg.Add(1) + go func() { + defer ipfixProcessesWg.Done() + fa.preprocessor.Run(stopCh) + }() + ipfixProcessesWg.Add(1) go func() { // Same comment as above. defer ipfixProcessesWg.Done() diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 09f1f906753..c63ccb5f175 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -548,6 +548,7 @@ func TestFlowAggregator_Run(t *testing.T) { activeFlowRecordTimeout: 1 * time.Hour, logTickerDuration: 1 * time.Hour, collectingProcess: mockCollectingProcess, + preprocessor: &preprocessor{}, aggregationProcess: mockAggregationProcess, ipfixExporter: mockIPFIXExporter, configWatcher: configWatcher, @@ -858,12 +859,12 @@ func TestFlowAggregator_InitAggregationProcess(t *testing.T) { activeFlowRecordTimeout: testActiveTimeout, inactiveFlowRecordTimeout: testInactiveTimeout, aggregatorTransportProtocol: flowaggregatorconfig.AggregatorTransportProtocolTCP, + registry: ipfix.NewIPFIXRegistry(), } - err := fa.InitCollectingProcess() - require.NoError(t, err) - - err = fa.InitAggregationProcess() - require.NoError(t, err) + require.NoError(t, fa.InitCollectingProcess()) + recordCh := make(chan ipfixentities.Record) + require.NoError(t, fa.InitPreprocessor(recordCh)) + require.NoError(t, fa.InitAggregationProcess(recordCh)) } func TestFlowAggregator_fillK8sMetadata(t *testing.T) { diff --git a/pkg/flowaggregator/preprocessor.go b/pkg/flowaggregator/preprocessor.go new file mode 100644 index 00000000000..ef4479d6faf --- /dev/null +++ b/pkg/flowaggregator/preprocessor.go @@ -0,0 +1,178 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flowaggregator + +import ( + "fmt" + "net" + + ipfixentities "github.com/vmware/go-ipfix/pkg/entities" + "k8s.io/klog/v2" +) + +// preprocessor is in charge of processing messages received from the IPFIX collector, prior to +// handling records over to the aggregation process. At the moment, its only task is to ensure that +// all records have the expected fields. If a record has extra fields, they will be discarded. If +// some fields are missing, they will be "appended" to the record with a "zero" value. For example, +// we will use 0 for integral types, "" for strings, 0.0.0.0 for IPv4 address, etc. Note that we are +// able to keep the implementation simple by assuming that a record either has missing fields or +// extra fields (not a combination of both), and that such fields are always at the tail of the +// field list. This assumption is based on implementation knowledge of the FlowExporter and the +// FlowAggregator. +type preprocessor struct { + inCh <-chan *ipfixentities.Message + outCh chan<- ipfixentities.Record + + expectedElementsV4 int + expectedElementsV6 int + + defaultElementsWithValueV4 []ipfixentities.InfoElementWithValue + defaultElementsWithValueV6 []ipfixentities.InfoElementWithValue +} + +func makeDefaultElementWithValue(ie *ipfixentities.InfoElement) (ipfixentities.InfoElementWithValue, error) { + switch ie.DataType { + case ipfixentities.OctetArray: + var val []byte + if ie.Len < ipfixentities.VariableLength { + val = make([]byte, ie.Len) + } + return ipfixentities.NewOctetArrayInfoElement(ie, val), nil + case ipfixentities.Unsigned8: + return ipfixentities.NewUnsigned8InfoElement(ie, 0), nil + case ipfixentities.Unsigned16: + return ipfixentities.NewUnsigned16InfoElement(ie, 0), nil + case ipfixentities.Unsigned32: + return ipfixentities.NewUnsigned32InfoElement(ie, 0), nil + case ipfixentities.Unsigned64: + return ipfixentities.NewUnsigned64InfoElement(ie, 0), nil + case ipfixentities.Signed8: + return ipfixentities.NewSigned8InfoElement(ie, 0), nil + case ipfixentities.Signed16: + return ipfixentities.NewSigned16InfoElement(ie, 0), nil + case ipfixentities.Signed32: + return ipfixentities.NewSigned32InfoElement(ie, 0), nil + case ipfixentities.Signed64: + return ipfixentities.NewSigned64InfoElement(ie, 0), nil + case ipfixentities.Float32: + return ipfixentities.NewFloat32InfoElement(ie, 0), nil + case ipfixentities.Float64: + return ipfixentities.NewFloat64InfoElement(ie, 0), nil + case ipfixentities.Boolean: + return ipfixentities.NewBoolInfoElement(ie, false), nil + case ipfixentities.DateTimeSeconds: + return ipfixentities.NewDateTimeSecondsInfoElement(ie, 0), nil + case ipfixentities.DateTimeMilliseconds: + return ipfixentities.NewDateTimeMillisecondsInfoElement(ie, 0), nil + case ipfixentities.MacAddress: + return ipfixentities.NewMacAddressInfoElement(ie, make([]byte, 6)), nil + case ipfixentities.Ipv4Address: + return ipfixentities.NewIPAddressInfoElement(ie, net.IPv4zero), nil + case ipfixentities.Ipv6Address: + return ipfixentities.NewIPAddressInfoElement(ie, net.IPv6zero), nil + case ipfixentities.String: + return ipfixentities.NewStringInfoElement(ie, ""), nil + default: + return nil, fmt.Errorf("unexpected Information Element data type: %d", ie.DataType) + } +} + +func makeDefaultElementsWithValue(infoElements []*ipfixentities.InfoElement) ([]ipfixentities.InfoElementWithValue, error) { + elementsWithValue := make([]ipfixentities.InfoElementWithValue, len(infoElements)) + for idx := range infoElements { + var err error + if elementsWithValue[idx], err = makeDefaultElementWithValue(infoElements[idx]); err != nil { + return nil, err + } + } + return elementsWithValue, nil +} + +func newPreprocessor(infoElementsV4, infoElementsV6 []*ipfixentities.InfoElement, inCh <-chan *ipfixentities.Message, outCh chan<- ipfixentities.Record) (*preprocessor, error) { + defaultElementsWithValueV4, err := makeDefaultElementsWithValue(infoElementsV4) + if err != nil { + return nil, fmt.Errorf("error when generating default values for IPv4 Information Elements expected from exporter: %w", err) + } + defaultElementsWithValueV6, err := makeDefaultElementsWithValue(infoElementsV6) + if err != nil { + return nil, fmt.Errorf("error when generating default values for IPv6 Information Elements expected from exporter: %w", err) + } + return &preprocessor{ + inCh: inCh, + outCh: outCh, + expectedElementsV4: len(infoElementsV4), + expectedElementsV6: len(infoElementsV6), + defaultElementsWithValueV4: defaultElementsWithValueV4, + defaultElementsWithValueV6: defaultElementsWithValueV6, + }, nil +} + +func (p *preprocessor) Run(stopCh <-chan struct{}) { + for { + select { + case <-stopCh: + return + case msg, ok := <-p.inCh: + if !ok { + return + } + p.processMsg(msg) + } + } +} + +func isRecordIPv4(record ipfixentities.Record) bool { + _, _, exist := record.GetInfoElementWithValue("sourceIPv4Address") + return exist +} + +func (p *preprocessor) processMsg(msg *ipfixentities.Message) { + set := msg.GetSet() + if set.GetSetType() != ipfixentities.Data { + return + } + records := set.GetRecords() + for _, record := range records { + elementList := record.GetOrderedElementList() + numElements := len(elementList) + isIPv4 := isRecordIPv4(record) + expectedElements := p.expectedElementsV4 + if !isIPv4 { + expectedElements = p.expectedElementsV6 + } + if numElements == expectedElements { + p.outCh <- record + } else if numElements > expectedElements { + if klog.V(5).Enabled() { + klog.InfoS("Record received from exporter includes unexpected elements, truncating", "expectedElements", expectedElements, "receivedElements", numElements) + } + // Creating a new Record seems like the best option here. By using + // NewDataRecordFromElements, we should minimize the number of allocations + // required. + p.outCh <- ipfixentities.NewDataRecordFromElements(0, elementList[:expectedElements], true) + } else { + if klog.V(5).Enabled() { + klog.InfoS("Record received from exporter is missing information elements, adding fields with zero values", "expectedElements", expectedElements, "receivedElements", numElements) + } + if isIPv4 { + elementList = append(elementList, p.defaultElementsWithValueV4[numElements:]...) + } else { + elementList = append(elementList, p.defaultElementsWithValueV6[numElements:]...) + } + p.outCh <- ipfixentities.NewDataRecordFromElements(0, elementList, true) + } + } + +} diff --git a/pkg/flowaggregator/preprocessor_test.go b/pkg/flowaggregator/preprocessor_test.go new file mode 100644 index 00000000000..100cc92883d --- /dev/null +++ b/pkg/flowaggregator/preprocessor_test.go @@ -0,0 +1,110 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flowaggregator + +import ( + "net" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + ipfixentities "github.com/vmware/go-ipfix/pkg/entities" +) + +func TestPreprocessorProcessMsg(t *testing.T) { + // For the sake of this test, we can use the same value for IPv4 and IPv6. + const testTemplateID = 256 + sourceIPv4AddressIE := ipfixentities.NewInfoElement("sourceIPv4Address", 8, 18, 0, 4) + destinationIPv4AddressIE := ipfixentities.NewInfoElement("destinationIPv4Address", 12, 18, 0, 4) + sourceIPv6AddressIE := ipfixentities.NewInfoElement("sourceIPv6Address", 27, 19, 0, 16) + destinationIPv6AddressIE := ipfixentities.NewInfoElement("destinationIPv6Address", 28, 19, 0, 16) + packetTotalCountIE := ipfixentities.NewInfoElement("packetTotalCount", 86, 4, 0, 8) + iesIPv4 := []*ipfixentities.InfoElement{sourceIPv4AddressIE, destinationIPv4AddressIE, packetTotalCountIE} + iesIPv6 := []*ipfixentities.InfoElement{sourceIPv6AddressIE, destinationIPv6AddressIE, packetTotalCountIE} + + iesWithValueIPv4 := []ipfixentities.InfoElementWithValue{ + ipfixentities.NewIPAddressInfoElement(sourceIPv4AddressIE, net.ParseIP("1.1.1.1")), + ipfixentities.NewIPAddressInfoElement(destinationIPv4AddressIE, net.ParseIP("1.1.2.1")), + } + iesWithValueIPv6 := []ipfixentities.InfoElementWithValue{ + ipfixentities.NewIPAddressInfoElement(sourceIPv6AddressIE, net.ParseIP("::1")), + ipfixentities.NewIPAddressInfoElement(destinationIPv6AddressIE, net.ParseIP("::2")), + } + + getTestMsg := func(iesWithValue []ipfixentities.InfoElementWithValue) *ipfixentities.Message { + s, err := ipfixentities.MakeDataSet(testTemplateID, iesWithValue) + require.NoError(t, err) + msg := ipfixentities.NewMessage(true) + msg.AddSet(s) + return msg + } + + testIPFamily := func(t *testing.T, iesWithValue []ipfixentities.InfoElementWithValue) { + require.Len(t, iesWithValue, 2) + testCases := []struct { + name string + templateElementsCount int + msg *ipfixentities.Message + expectedIEsWithValue []ipfixentities.InfoElementWithValue + }{ + { + name: "same elements", + templateElementsCount: 2, + msg: getTestMsg(iesWithValue[:2]), + expectedIEsWithValue: iesWithValue, + }, + { + name: "extra elements", + templateElementsCount: 1, + msg: getTestMsg(iesWithValue[:2]), + expectedIEsWithValue: iesWithValue[:1], + }, + { + name: "missing elements", + templateElementsCount: 3, + msg: getTestMsg(iesWithValue[:2]), + expectedIEsWithValue: append(iesWithValue, ipfixentities.NewUnsigned64InfoElement(packetTotalCountIE, 0)), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Buffered channel with capacity 1 to hold the output record generated by processMsg. + outCh := make(chan ipfixentities.Record, 1) + p, err := newPreprocessor(iesIPv4[:tc.templateElementsCount], iesIPv6[:tc.templateElementsCount], nil, outCh) + require.NoError(t, err) + p.processMsg(tc.msg) + var r ipfixentities.Record + select { + case r = <-outCh: + default: + } + if tc.expectedIEsWithValue == nil { + assert.Nil(t, r, "No record expected") + } else { + require.NotNil(t, r, "Record was expected") + assert.Equal(t, tc.expectedIEsWithValue, r.GetOrderedElementList()) + } + }) + } + } + + t.Run("ipv4", func(t *testing.T) { + testIPFamily(t, iesWithValueIPv4) + }) + t.Run("ipv6", func(t *testing.T) { + testIPFamily(t, iesWithValueIPv6) + }) +}