Skip to content

Commit

Permalink
NETOBSERV-1298: include duplicate list in the exported record
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Nov 24, 2023
1 parent 06f4492 commit 0f53015
Show file tree
Hide file tree
Showing 13 changed files with 322 additions and 158 deletions.
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl
rbTracer.SendsTo(accounter)

if f.cfg.Deduper == DeduperFirstCome {
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark),
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark, f.cfg.DeduperMerge),
node.ChannelBufferLen(f.cfg.BuffersLength))
mapTracer.SendsTo(deduper)
accounter.SendsTo(deduper)
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type Config struct {
DeduperFCExpiry time.Duration `env:"DEDUPER_FC_EXPIRY"`
// DeduperJustMark will just mark duplicates (boolean field) instead of dropping them.
DeduperJustMark bool `env:"DEDUPER_JUST_MARK"`
// DeduperMerge will merge duplicated flows and generate list of interfaces and direction pairs
DeduperMerge bool `env:"DEDUPER_MERGE" envDefault:"false"`
// Direction allows selecting which flows to trace according to its direction. Accepted values
// are "ingress", "egress" or "both" (default).
Direction string `env:"DIRECTION" envDefault:"both"`
Expand Down
7 changes: 6 additions & 1 deletion pkg/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
if flow.Packets != 0 {
out["Packets"] = flow.Packets
}

var interfaces []interface{}
var directions []interface{}
for _, entry := range flow.GetDupList() {
out["Interfaces"] = append([]interface{}{entry.Interface}, interfaces...)
out["FlowDirections"] = append([]interface{}{int(entry.Direction.Number())}, directions...)
}
ethType := ethernet.EtherType(flow.EthProtocol)
if ethType == ethernet.EtherTypeIPv4 || ethType == ethernet.EtherTypeIPv6 {
out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr())
Expand Down
10 changes: 9 additions & 1 deletion pkg/decode/decode_protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ func TestPBFlowToMap(t *testing.T) {
someTime := time.Now()
var someDuration time.Duration = 10000000 // 10ms
flow := &pbflow.Record{
Interface: "eth0",
Interface: "eth0",
DupList: []*pbflow.DupMapEntry{
{
Interface: "eth0",
Direction: pbflow.Direction_EGRESS,
},
},
EthProtocol: 2048,
Bytes: 456,
Packets: 123,
Expand Down Expand Up @@ -64,6 +70,7 @@ func TestPBFlowToMap(t *testing.T) {
delete(out, "TimeReceived")
assert.Equal(t, config.GenericMap{
"FlowDirection": 1,
"FlowDirections": []interface{}{1},
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
Expand All @@ -78,6 +85,7 @@ func TestPBFlowToMap(t *testing.T) {
"Proto": uint32(6),
"TimeFlowStartMs": someTime.UnixMilli(),
"TimeFlowEndMs": someTime.UnixMilli(),
"Interfaces": []interface{}{"eth0"},
"Interface": "eth0",
"AgentIP": "10.9.8.7",
"Flags": uint32(0x100),
Expand Down
6 changes: 4 additions & 2 deletions pkg/exporter/kafka_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ func TestProtoConversion(t *testing.T) {
var r pbflow.Record
require.NoError(t, proto.Unmarshal(wc.messages[0].Value, &r))
assert.EqualValues(t, 3, r.EthProtocol)
assert.EqualValues(t, 1, r.Direction)
for _, e := range r.DupList {
assert.EqualValues(t, 1, e.Direction)
assert.Equal(t, "veth0", e.Interface)
}
assert.EqualValues(t, uint64(0xaabbccddeeff), r.DataLink.SrcMac)
assert.EqualValues(t, uint64(0x112233445566), r.DataLink.DstMac)
assert.EqualValues(t, uint64(0xC0010203) /* 192.1.2.3 */, r.Network.SrcAddr.GetIpv4())
Expand All @@ -71,7 +74,6 @@ func TestProtoConversion(t *testing.T) {
assert.EqualValues(t, 789, r.Bytes)
assert.EqualValues(t, 987, r.Packets)
assert.EqualValues(t, uint16(1), r.Flags)
assert.Equal(t, "veth0", r.Interface)
assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("127.3.2.1")), wc.messages[0].Key[0:16])
assert.Equal(t, ByteArrayFromNetIP(net.ParseIP("192.1.2.3")), wc.messages[0].Key[16:])
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/exporter/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
Duplicate: fr.Duplicate,
AgentIp: agentIP(fr.AgentIP),
Flags: uint32(fr.Metrics.Flags),
Interface: string(fr.Interface),
Interface: fr.Interface,
PktDropBytes: fr.Metrics.PktDrops.Bytes,
PktDropPackets: uint64(fr.Metrics.PktDrops.Packets),
PktDropLatestFlags: uint32(fr.Metrics.PktDrops.LatestFlags),
Expand All @@ -85,6 +85,13 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
if fr.Metrics.DnsRecord.Latency != 0 {
pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency)
}
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
for _, m := range fr.DupList {
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
Interface: fr.Interface,
Direction: pbflow.Direction(m[fr.Interface]),
})
}
return &pbflowRecord
}

Expand Down Expand Up @@ -135,6 +142,13 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
if fr.Metrics.DnsRecord.Latency != 0 {
pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency)
}
pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0)
for _, m := range fr.DupList {
pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{
Interface: fr.Interface,
Direction: pbflow.Direction(m[fr.Interface]),
})
}
return &pbflowRecord
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/flow/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func TestEvict_MaxEntries(t *testing.T) {
},
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
DupList: make([]map[string]uint8, 0),
},
k2: {
RawRecord: RawRecord{
Expand All @@ -119,6 +120,7 @@ func TestEvict_MaxEntries(t *testing.T) {
},
TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond),
DupList: make([]map[string]uint8, 0),
},
}, received)
}
Expand Down Expand Up @@ -187,6 +189,7 @@ func TestEvict_Period(t *testing.T) {
},
TimeFlowStart: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 789),
DupList: make([]map[string]uint8, 0),
}, *records[0])
records = receiveTimeout(t, evictor)
require.Len(t, records, 1)
Expand All @@ -203,6 +206,7 @@ func TestEvict_Period(t *testing.T) {
},
TimeFlowStart: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1456),
DupList: make([]map[string]uint8, 0),
}, *records[0])

// no more flows are evicted
Expand Down
17 changes: 14 additions & 3 deletions pkg/flow/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ type entry struct {
dnsRecord *ebpf.BpfDnsRecordT
ifIndex uint32
expiryTime time.Time
dupList *[]map[string]uint8
}

// Dedupe receives flows and filters these belonging to duplicate interfaces. It will forward
// the flows from the first interface coming to it, until that flow expires in the cache
// (no activity for it during the expiration time)
// The justMark argument tells that the deduper should not drop the duplicate flows but
// set their Duplicate field.
func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, out chan<- []*Record) {
func Dedupe(expireTime time.Duration, justMark, mergeDup bool) func(in <-chan []*Record, out chan<- []*Record) {
cache := &deduperCache{
expire: expireTime,
entries: list.New(),
Expand All @@ -47,7 +48,7 @@ func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, o
cache.removeExpired()
fwd := make([]*Record, 0, len(records))
for _, record := range records {
cache.checkDupe(record, justMark, &fwd)
cache.checkDupe(record, justMark, mergeDup, &fwd)
}
if len(fwd) > 0 {
out <- fwd
Expand All @@ -57,7 +58,8 @@ func Dedupe(expireTime time.Duration, justMark bool) func(in <-chan []*Record, o
}

// checkDupe check current record if its already available nad if not added to fwd records list
func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) {
func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Record) {
mergeEntry := make(map[string]uint8)
rk := r.Id
// zeroes fields from key that should be ignored from the flow comparison
rk.IfIndex = 0
Expand Down Expand Up @@ -86,6 +88,10 @@ func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) {
r.Duplicate = true
*fwd = append(*fwd, r)
}
if mergeDup {
mergeEntry[r.Interface] = r.Id.Direction
*fEntry.dupList = append(*fEntry.dupList, mergeEntry)
}
return
}
*fwd = append(*fwd, r)
Expand All @@ -99,6 +105,11 @@ func (c *deduperCache) checkDupe(r *Record, justMark bool, fwd *[]*Record) {
ifIndex: r.Id.IfIndex,
expiryTime: timeNow().Add(c.expire),
}
if mergeDup {
mergeEntry[r.Interface] = r.Id.Direction
r.DupList = append(r.DupList, mergeEntry)
e.dupList = &r.DupList
}
c.ifaces[rk] = c.entries.PushFront(&e)
*fwd = append(*fwd, r)
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/flow/deduper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestDedupe(t *testing.T) {
input := make(chan []*Record, 100)
output := make(chan []*Record, 100)

go Dedupe(time.Minute, false)(input, output)
go Dedupe(time.Minute, false, false)(input, output)

input <- []*Record{
oneIf2, // record 1 at interface 2: should be accepted
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestDedupe_EvictFlows(t *testing.T) {
input := make(chan []*Record, 100)
output := make(chan []*Record, 100)

go Dedupe(15*time.Second, false)(input, output)
go Dedupe(15*time.Second, false, false)(input, output)

// Should only accept records 1 and 2, at interface 1
input <- []*Record{oneIf1, twoIf1, oneIf2}
Expand Down Expand Up @@ -120,6 +120,21 @@ func TestDedupe_EvictFlows(t *testing.T) {
receiveTimeout(t, output))
}

func TestDedupeMerge(t *testing.T) {
input := make(chan []*Record, 100)
output := make(chan []*Record, 100)

go Dedupe(time.Minute, false, true)(input, output)

input <- []*Record{
oneIf2, // record 1 at interface 2: should be accepted
oneIf1,
}
deduped := receiveTimeout(t, output)
assert.Equal(t, []*Record{oneIf2}, deduped)
assert.Equal(t, 2, len(oneIf2.DupList))
}

type timerMock struct {
now time.Time
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/flow/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Record struct {
AgentIP net.IP
// Calculated RTT which is set when record is created by calling NewRecord
TimeFlowRtt time.Duration
DupList []map[string]uint8
}

func NewRecord(
Expand All @@ -78,6 +79,7 @@ func NewRecord(
if metrics.DnsRecord.Latency != 0 {
record.DNSLatency = time.Duration(metrics.DnsRecord.Latency)
}
record.DupList = make([]map[string]uint8, 0)
return &record
}

Expand Down
21 changes: 15 additions & 6 deletions pkg/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,12 @@ func BenchmarkIPv4GRPCCommunication(b *testing.B) {
EthProtocol: 2048,
Bytes: 456,
Flags: 1,

Direction: pbflow.Direction_EGRESS,
DupList: []*pbflow.DupMapEntry{
{
Interface: "eth0",
Direction: pbflow.Direction_EGRESS,
},
},
TimeFlowStart: timestamppb.Now(),
TimeFlowEnd: timestamppb.Now(),
Network: &pbflow.Network{
Expand Down Expand Up @@ -196,10 +200,15 @@ func BenchmarkIPv6GRPCCommunication(b *testing.B) {
client := cc.Client()

f := &pbflow.Record{
EthProtocol: 2048,
Bytes: 456,
Flags: 1,
Direction: pbflow.Direction_EGRESS,
EthProtocol: 2048,
Bytes: 456,
Flags: 1,
DupList: []*pbflow.DupMapEntry{
{
Interface: "eth0",
Direction: pbflow.Direction_EGRESS,
},
},
TimeFlowStart: timestamppb.Now(),
TimeFlowEnd: timestamppb.Now(),
Network: &pbflow.Network{
Expand Down
Loading

0 comments on commit 0f53015

Please sign in to comment.