Skip to content

Commit

Permalink
Merge pull request #364 from jpinsonneau/1703
Browse files Browse the repository at this point in the history
NETOBSERV-1703 Add enrichment in packet capture
  • Loading branch information
jpinsonneau authored Jul 25, 2024
2 parents c9b6a78 + b39afec commit bef4343
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 29 deletions.
4 changes: 2 additions & 2 deletions examples/packetcapture-dump/client/packetcapture-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"os"
"time"

"github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"

"github.com/google/gopacket/layers"
)
Expand Down Expand Up @@ -67,7 +67,7 @@ func main() {
os.Exit(1)
}
// write pcap file header
_, err = f.Write(exporter.GetPCAPFileHeader(snapshotlen, layers.LinkTypeEthernet))
_, err = f.Write(utils.GetPCAPFileHeader(snapshotlen, layers.LinkTypeEthernet))
if err != nil {
fmt.Println("Write file header failed:", err.Error())
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func buildFlowExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*fl
case "ipfix+tcp":
return buildIPFIXExporter(cfg, "tcp")
case "direct-flp":
return buildDirectFLPExporter(cfg)
return buildFlowDirectFLPExporter(cfg)
default:
return nil, fmt.Errorf("wrong flow export type %s", cfg.Export)
}
Expand All @@ -323,7 +323,7 @@ func buildGRPCExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*fl
return grpcExporter.ExportFlows, nil
}

func buildDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
func buildFlowDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
flpExporter, err := exporter.StartDirectFLP(cfg.FLPConfig, cfg.BuffersLength)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Config struct {
AgentIPType string `env:"AGENT_IP_TYPE" envDefault:"any"`
// Export selects the exporter protocol.
// Accepted values for Flows are: grpc (default), kafka, ipfix+udp, ipfix+tcp or direct-flp.
// Accepted values for Packets are: grpc (default) or tcp
// Accepted values for Packets are: grpc (default) or direct-flp
Export string `env:"EXPORT" envDefault:"grpc"`
// Host is the host name or IP of the flow or packet collector, when the EXPORT variable is
// set to "grpc"
Expand Down
10 changes: 10 additions & 0 deletions pkg/agent/packets_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,21 @@ func buildPacketExporter(cfg *Config) (node.TerminalFunc[[]*flow.PacketRecord],
switch cfg.Export {
case "grpc":
return buildGRPCPacketExporter(cfg)
case "direct-flp":
return buildPacketDirectFLPExporter(cfg)
default:
return nil, fmt.Errorf("unsupported packet export type %s", cfg.Export)
}
}

func buildPacketDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*flow.PacketRecord], error) {
flpExporter, err := exporter.StartDirectFLP(cfg.FLPConfig, cfg.BuffersLength)
if err != nil {
return nil, err
}
return flpExporter.ExportPackets, nil
}

// Run a Packets agent. The function will keep running in the same thread
// until the passed context is canceled
func (p *Packets) Run(ctx context.Context) error {
Expand Down
68 changes: 68 additions & 0 deletions pkg/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package decode

import (
"encoding/base64"
"fmt"
"syscall"
"time"
Expand All @@ -9,6 +10,8 @@ import (
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/mdlayher/ethernet"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -139,6 +142,71 @@ func RecordToMap(fr *flow.Record) config.GenericMap {
return out
}

func PacketToMap(pr *flow.PacketRecord) config.GenericMap {
out := config.GenericMap{}

if pr == nil {
return out
}

packet := gopacket.NewPacket(pr.Stream, layers.LayerTypeEthernet, gopacket.Lazy)
if ethLayer := packet.Layer(layers.LayerTypeEthernet); ethLayer != nil {
eth, _ := ethLayer.(*layers.Ethernet)
out["SrcMac"] = eth.SrcMAC.String()
out["DstMac"] = eth.DstMAC.String()
}

if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil {
tcp, _ := tcpLayer.(*layers.TCP)
out["SrcPort"] = tcp.SrcPort.String()
out["DstPort"] = tcp.DstPort.String()
} else if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil {
udp, _ := udpLayer.(*layers.UDP)
out["SrcPort"] = udp.SrcPort.String()
out["DstPort"] = udp.DstPort.String()
} else if sctpLayer := packet.Layer(layers.LayerTypeSCTP); sctpLayer != nil {
sctp, _ := sctpLayer.(*layers.SCTP)
out["SrcPort"] = sctp.SrcPort.String()
out["DstPort"] = sctp.DstPort.String()
}

if ipv4Layer := packet.Layer(layers.LayerTypeIPv4); ipv4Layer != nil {
ipv4, _ := ipv4Layer.(*layers.IPv4)
out["SrcAddr"] = ipv4.SrcIP.String()
out["DstAddr"] = ipv4.DstIP.String()
out["Proto"] = ipv4.Protocol
} else if ipv6Layer := packet.Layer(layers.LayerTypeIPv6); ipv6Layer != nil {
ipv6, _ := ipv6Layer.(*layers.IPv6)
out["SrcAddr"] = ipv6.SrcIP.String()
out["DstAddr"] = ipv6.DstIP.String()
out["Proto"] = ipv6.NextHeader
}

if icmpv4Layer := packet.Layer(layers.LayerTypeICMPv4); icmpv4Layer != nil {
icmpv4, _ := icmpv4Layer.(*layers.ICMPv4)
out["IcmpType"] = icmpv4.TypeCode.Type()
out["IcmpCode"] = icmpv4.TypeCode.Code()
} else if icmpv6Layer := packet.Layer(layers.LayerTypeICMPv6); icmpv6Layer != nil {
icmpv6, _ := icmpv6Layer.(*layers.ICMPv6)
out["IcmpType"] = icmpv6.TypeCode.Type()
out["IcmpCode"] = icmpv6.TypeCode.Code()
}

if dnsLayer := packet.Layer(layers.LayerTypeDNS); dnsLayer != nil {
dns, _ := dnsLayer.(*layers.DNS)
out["DnsId"] = dns.ID
out["DnsFlagsResponseCode"] = dns.ResponseCode.String()
//TODO: add DNS questions / answers / authorities
}

out["Bytes"] = len(pr.Stream)
// Data is base64 encoded to avoid marshal / unmarshal issues
out["Data"] = base64.StdEncoding.EncodeToString(packet.Data())
out["Time"] = pr.Time.Unix()

return out
}

// TCPStateToStr is based on kernel TCP state definition
// https://elixir.bootlin.com/linux/v6.3/source/include/net/tcp_states.h#L12
func TCPStateToStr(state uint32) string {
Expand Down
12 changes: 12 additions & 0 deletions pkg/exporter/direct_flp.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ func (d *DirectFLP) ExportFlows(input <-chan []*flow.Record) {
}
}

// ExportPackets accepts slices of *flow.PacketRecord by its input channel, converts them
// to *pbflow.Records instances, and submits them to the collector.
func (d *DirectFLP) ExportPackets(input <-chan []*flow.PacketRecord) {
for inputPackets := range input {
for _, packet := range inputPackets {
if len(packet.Stream) != 0 {
d.fwd <- decode.PacketToMap(packet)
}
}
}
}

func (d *DirectFLP) Close() {
close(d.fwd)
}
31 changes: 8 additions & 23 deletions pkg/exporter/grpc_packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package exporter

import (
"context"
"fmt"
"time"

"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"

"github.com/google/gopacket"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/anypb"
)
Expand All @@ -22,22 +22,14 @@ type GRPCPacketProto struct {
var gplog = logrus.WithField("component", "packet/GRPCPackets")

// WritePacket writes the given packet data out to gRPC.
func writeGRPCPacket(ci gopacket.CaptureInfo, data []byte, conn *grpc.ClientConnection) error {
if ci.CaptureLength != len(data) {
return fmt.Errorf("capture length %d does not match data length %d", ci.CaptureLength, len(data))
}
if ci.CaptureLength > ci.Length {
return fmt.Errorf("invalid capture info %+v: capture length > length", ci)
}
gplog.Debugf("Sending Packet to client. Length: %d", len(data))
b, err := GetPacketHeader(ci)
func writeGRPCPacket(time time.Time, data []byte, conn *grpc.ClientConnection) error {
bytes, err := utils.GetPacketBytesWithHeader(time, data)
if err != nil {
return fmt.Errorf("error writing packet header: %w", err)
return err
}
// write 16 byte packet header & data all at once
_, err = conn.Client().Send(context.TODO(), &pbpacket.Packet{
Pcap: &anypb.Any{
Value: append(b, data...),
Value: bytes,
},
})
return err
Expand All @@ -59,15 +51,8 @@ func (p *GRPCPacketProto) ExportGRPCPackets(in <-chan []*flow.PacketRecord) {
for packetRecord := range in {
var errs []error
for _, packet := range packetRecord {
packetStream := packet.Stream
packetTimestamp := packet.Time
if len(packetStream) != 0 {
captureInfo := gopacket.CaptureInfo{
Timestamp: packetTimestamp,
CaptureLength: len(packetStream),
Length: len(packetStream),
}
if err := writeGRPCPacket(captureInfo, packetStream, p.clientConn); err != nil {
if len(packet.Stream) != 0 {
if err := writeGRPCPacket(packet.Time, packet.Stream, p.clientConn); err != nil {
errs = append(errs, err)
}
}
Expand Down
23 changes: 22 additions & 1 deletion pkg/exporter/packets_proto.go → pkg/utils/packets.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package exporter
package utils

import (
"encoding/binary"
"fmt"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
Expand Down Expand Up @@ -38,3 +39,23 @@ func GetPacketHeader(ci gopacket.CaptureInfo) ([]byte, error) {
binary.LittleEndian.PutUint32(buf[12:16], uint32(ci.Length))
return buf[:], nil
}

func GetPacketBytesWithHeader(time time.Time, data []byte) ([]byte, error) {
ci := gopacket.CaptureInfo{
Timestamp: time,
CaptureLength: len(data),
Length: len(data),
}
if ci.CaptureLength != len(data) {
return nil, fmt.Errorf("capture length %d does not match data length %d", ci.CaptureLength, len(data))
}
if ci.CaptureLength > ci.Length {
return nil, fmt.Errorf("invalid capture info %+v: capture length > length", ci)
}
b, err := GetPacketHeader(ci)
if err != nil {
return nil, fmt.Errorf("error writing packet header: %w", err)
}
// append 16 byte packet header & data all at once
return append(b, data...), nil
}

0 comments on commit bef4343

Please sign in to comment.