Skip to content

Commit

Permalink
Add proper message size check for UDP
Browse files Browse the repository at this point in the history
Added a path MTU config parameter for InitExportingProcess function.
For UDP transport, based on path MTU parameter, we make a decision on
the message size. This is optional parameter for TCP transport; TCP
supports max socket buffer size (65535).
  • Loading branch information
srikartati committed Dec 2, 2020
1 parent 8cef4ef commit 0e95fc4
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 21 deletions.
2 changes: 2 additions & 0 deletions pkg/entities/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

const (
MaxTcpSocketMsgSize uint16 = 65535
DefaultUDPMsgSize uint16 = 512
MaxUDPMsgSize uint16 = 1500
)

// Message represents IPFIX message.
Expand Down
37 changes: 27 additions & 10 deletions pkg/exporter/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,22 @@ type ExportingProcess struct {
obsDomainID uint32
seqNumber uint32
templateID uint16
pathMTU uint16
templatesMap map[uint16]templateValue
templateRefCh chan struct{}
mutex sync.Mutex
}

// InitExportingProcess takes in collector address(net.Addr format), obsID(observation ID) and tempRefTimeout
// (template refresh timeout). tempRefTimeout is applicable only for collectors listening over UDP; unit is seconds. For TCP, you can
// pass any value. For UDP, if 0 is passed, consider 1800s as default.
// TODO: Get obsID, tempRefTimeout as args which can be of dynamic size supporting both TCP and UDP.
func InitExportingProcess(collectorAddr net.Addr, obsID uint32, tempRefTimeout uint32) (*ExportingProcess, error) {
// InitExportingProcess takes in collector address(net.Addr format), obsID(observation ID)
// and tempRefTimeout(template refresh timeout). tempRefTimeout is applicable only
// for collectors listening over UDP; unit is seconds. For TCP, you can pass any
// value. For UDP, if 0 is passed, consider 1800s as default.
//
// PathMTU is recommended for UDP transport. If not given a valid value, i.e., either
// 0 or a value more than 1500, we consider a default value of 512B as per RFC7011.
// PathMTU is optional for TCP as we use max socket buffer size of 65535. It can
// be provided as 0.
func InitExportingProcess(collectorAddr net.Addr, obsID uint32, tempRefTimeout uint32, pathMTU uint16) (*ExportingProcess, error) {
conn, err := net.Dial(collectorAddr.Network(), collectorAddr.String())
if err != nil {
klog.Errorf("Cannot the create the connection to configured ExportingProcess %s: %v", collectorAddr.String(), err)
Expand All @@ -64,12 +70,16 @@ func InitExportingProcess(collectorAddr net.Addr, obsID uint32, tempRefTimeout u
obsDomainID: obsID,
seqNumber: 0,
templateID: startTemplateID,
pathMTU: pathMTU,
templatesMap: make(map[uint16]templateValue),
templateRefCh: make(chan struct{}),
}

// Template refresh logic is only for UDP transport.
// Template refresh logic and pathMTU check is only required for UDP transport.
if collectorAddr.Network() == "udp" {
if expProc.pathMTU == 0 || expProc.pathMTU > entities.MaxUDPMsgSize {
expProc.pathMTU = entities.DefaultUDPMsgSize
}
if tempRefTimeout == 0 {
// Default value
tempRefTimeout = entities.TemplateRefreshTimeOut
Expand Down Expand Up @@ -109,6 +119,7 @@ func (ep *ExportingProcess) SendSet(set entities.Set) (int, error) {
}
}
}

// Update the length in set header before sending the message.
set.UpdateLenInHeader()
bytesSent, err := ep.createAndSendMsg(set)
Expand Down Expand Up @@ -149,11 +160,17 @@ func (ep *ExportingProcess) createAndSendMsg(set entities.Set) (int, error) {
return 0, fmt.Errorf("error when creating header: %v", err)
}

// Check if message is exceeding the limit with new set
// Check if message is exceeding the limit after adding the set. Include message
// header length too.
msgLen := uint16(msg.GetMsgBufferLen()) + set.GetBuffLen()
// TODO: Change the limit for UDP transport. This is only valid for TCP transport.
if msgLen > entities.MaxTcpSocketMsgSize {
return 0, fmt.Errorf("set size exceeds max socket size")
if ep.connToCollector.LocalAddr().Network() == "tcp" {
if msgLen > entities.MaxTcpSocketMsgSize {
return 0, fmt.Errorf("TCP transport: message size exceeds max socket buffer size")
}
} else {
if msgLen > ep.pathMTU {
return 0, fmt.Errorf("UDP transport: message size exceeds max pathMTU (set as %v)", ep.pathMTU)
}
}

// Set the fields in the message header.
Expand Down
36 changes: 26 additions & 10 deletions pkg/exporter/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestExportingProcess_SendingTemplateRecordToLocalTCPServer(t *testing.T) {
}()

// Create exporter using local server info
exporter, err := InitExportingProcess(listener.Addr(), 1, 0)
exporter, err := InitExportingProcess(listener.Addr(), 1, 0, 0)
if err != nil {
t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err)
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestExportingProcess_SendingTemplateRecordToLocalUDPServer(t *testing.T) {
}()

// Create exporter using local server info
exporter, err := InitExportingProcess(conn.LocalAddr(), 1, 1)
exporter, err := InitExportingProcess(conn.LocalAddr(), 1, 1, 0)
if err != nil {
t.Fatalf("Got error when connecting to local server %s: %v", conn.LocalAddr().String(), err)
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) {
}()

// Create exporter using local server info
exporter, err := InitExportingProcess(listener.Addr(), 1, 0)
exporter, err := InitExportingProcess(listener.Addr(), 1, 0, 0)
if err != nil {
t.Fatalf("Got error when connecting to local server %s: %v", listener.Addr().String(), err)
}
Expand Down Expand Up @@ -247,13 +247,21 @@ func TestExportingProcess_SendingDataRecordToLocalTCPServer(t *testing.T) {
dataRecBytes := dataRecBuff.Bytes()

bytesSent, err := exporter.SendSet(dataSet)
if err != nil {
t.Fatalf("Got error when sending record: %v", err)
}
assert.NoError(t, err)
// 28 is the size of the IPFIX message including all headers (20 bytes)
assert.Equal(t, 28, bytesSent)
assert.Equal(t, dataRecBytes, <-buffCh)
assert.Equal(t, uint32(1), exporter.seqNumber)

// Create data set with multiple data records to test invalid message length
// logic for TCP transport.
dataSet = entities.NewSet(entities.Data, templateID, false)
for i := 0; i < 10000; i++ {
dataSet.AddRecord(elements, templateID)
}
bytesSent, err = exporter.SendSet(dataSet)
assert.Error(t, err)

exporter.CloseConnToCollector()
}

Expand Down Expand Up @@ -284,7 +292,7 @@ func TestExportingProcess_SendingDataRecordToLocalUDPServer(t *testing.T) {
}()

// Create exporter using local server info
exporter, err := InitExportingProcess(conn.LocalAddr(), 1, 0)
exporter, err := InitExportingProcess(conn.LocalAddr(), 1, 0, 0)
if err != nil {
t.Fatalf("Got error when connecting to local server %s: %v", conn.LocalAddr().String(), err)
}
Expand Down Expand Up @@ -328,12 +336,20 @@ func TestExportingProcess_SendingDataRecordToLocalUDPServer(t *testing.T) {
dataRecBytes := dataRecBuff.Bytes()

bytesSent, err := exporter.SendSet(dataSet)
if err != nil {
t.Fatalf("Got error when sending record: %v", err)
}
assert.NoError(t, err)
// 28 is the size of the IPFIX message including all headers (20 bytes)
assert.Equal(t, 28, bytesSent)
assert.Equal(t, dataRecBytes, <-buffCh)
assert.Equal(t, uint32(1), exporter.seqNumber)

// Create data set with multiple data records to test invalid message length
// logic for
dataSet = entities.NewSet(entities.Data, templateID, false)
for i := 0; i < 100; i++ {
dataSet.AddRecord(elements, templateID)
}
bytesSent, err = exporter.SendSet(dataSet)
assert.Error(t, err)

exporter.CloseConnToCollector()
}
2 changes: 1 addition & 1 deletion pkg/test/exporter_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func testExporterToCollector(address net.Addr, isMultipleRecord bool, t *testing

go func() { // Start exporting process in go routine
time.Sleep(2 * time.Second) // wait for collector to be ready
export, err := exporter.InitExportingProcess(address, 1, 0)
export, err := exporter.InitExportingProcess(address, 1, 0, 0)
if err != nil {
klog.Fatalf("Got error when connecting to %s", address.String())
}
Expand Down

0 comments on commit 0e95fc4

Please sign in to comment.