diff --git a/.github/workflows/testing-dnstap.yml b/.github/workflows/testing-dnstap.yml index 4aac55e8..7796c473 100644 --- a/.github/workflows/testing-dnstap.yml +++ b/.github/workflows/testing-dnstap.yml @@ -27,13 +27,15 @@ jobs: with: go-version: ${{ matrix.go-version }} - uses: actions/setup-python@v4 + with: + python-version: 3.11 - name: build binary run: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o go-dnscollector *.go - name: Deploy docker image run: | - sudo docker run -d --network="host" --name=unbound --volume=$PWD/testsdata/unbound_${{ matrix.mode }}.conf:/opt/unbound/etc/unbound/unbound.conf:z -v /tmp/:/opt/unbound/etc/unbound/tmp/:z mvance/unbound:${{ matrix.unbound }} + sudo docker run -d --network="host" --name=unbound --volume=$PWD/testsdata/unbound/unbound_${{ matrix.mode }}.conf:/opt/unbound/etc/unbound/unbound.conf:z -v /tmp/:/opt/unbound/etc/unbound/tmp/:z mvance/unbound:${{ matrix.unbound }} until (dig -p 5553 www.github.com @127.0.0.1 | grep NOERROR); do sleep 5.0; done - name: Test ${{ matrix.mode }} @@ -57,13 +59,15 @@ jobs: with: go-version: ${{ matrix.go-version }} - uses: actions/setup-python@v4 + with: + python-version: 3.11 - name: build binary run: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o go-dnscollector *.go - name: Deploy coredns docker image run: | - sudo docker run -d --network="host" --name=coredns -v $PWD/testsdata/:$PWD/testsdata/ -v /tmp/:/tmp/ coredns/coredns:${{ matrix.coredns }} -conf $PWD/testsdata/coredns_${{ matrix.mode }}.conf + sudo docker run -d --network="host" --name=coredns -v $PWD/testsdata/:$PWD/testsdata/ -v /tmp/:/tmp/ coredns/coredns:${{ matrix.coredns }} -conf $PWD/testsdata/coredns/coredns_${{ matrix.mode }}.conf until (dig -p 5553 www.github.com @127.0.0.1 | grep NOERROR); do sleep 5.0; done - name: Test ${{ matrix.mode }} @@ -90,6 +94,8 @@ jobs: with: go-version: ${{ matrix.go-version }} - uses: actions/setup-python@v4 + with: + python-version: 3.11 - name: build binary run: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o go-dnscollector *.go @@ -101,7 +107,7 @@ jobs: - name: Deploy dnsdist docker image run: | - sudo docker run -d --network="host" --name=dnsdist --volume=$PWD/testsdata/dnsdist_${{ matrix.mode }}.conf:/etc/dnsdist/conf.d/dnsdist.conf:z -v /tmp/:/tmp/ powerdns/dnsdist-${{ matrix.dnsdist }} + sudo docker run -d --network="host" --name=dnsdist --volume=$PWD/testsdata/powerdns/dnsdist_${{ matrix.mode }}.conf:/etc/dnsdist/conf.d/dnsdist.conf:z -v /tmp/:/tmp/ powerdns/dnsdist-${{ matrix.dnsdist }} until (dig -p 5553 www.github.com @127.0.0.1 | grep NOERROR); do sleep 5.0; done - name: Test ${{ matrix.mode }} diff --git a/.github/workflows/testing-go.yml b/.github/workflows/testing-go.yml index 1e51d837..70bb9c73 100644 --- a/.github/workflows/testing-go.yml +++ b/.github/workflows/testing-go.yml @@ -14,7 +14,7 @@ jobs: matrix: os-version: ['ubuntu-22.04', 'macos-12'] go-version: ['1.19', '1.20'] - package: ['dnsutils', 'collectors', 'loggers', 'transformers'] + package: ['dnsutils', 'collectors', 'loggers', 'transformers', 'netlib'] runs-on: ${{ matrix.os-version }} diff --git a/.github/workflows/testing-powerdns.yml b/.github/workflows/testing-powerdns.yml index 94578b53..727bab27 100644 --- a/.github/workflows/testing-powerdns.yml +++ b/.github/workflows/testing-powerdns.yml @@ -32,7 +32,7 @@ jobs: - name: Deploy dnsdist docker image run: | - sudo docker run -d --network="host" --name=dnsdist --volume=$PWD/testsdata/dnsdist_protobuf.conf:/etc/dnsdist/conf.d/dnsdist.conf:z -v /tmp/:/tmp/ powerdns/dnsdist-${{ matrix.dnsdist }} + sudo docker run -d --network="host" --name=dnsdist --volume=$PWD/testsdata/powerdns/dnsdist_protobuf.conf:/etc/dnsdist/conf.d/dnsdist.conf:z -v /tmp/:/tmp/ powerdns/dnsdist-${{ matrix.dnsdist }} until (dig -p 5553 www.github.com @127.0.0.1 | grep NOERROR); do sleep 5.0; done - name: Test dns query @@ -60,7 +60,7 @@ jobs: - name: Deploy recursor docker image run: | - sudo docker run -d --network="host" --name=recursor --volume=$PWD/testsdata/pdns_recursor.lua:/etc/powerdns/recursor.lua:z --volume=$PWD/testsdata/pdns_recursor.conf:/etc/powerdns/recursor.conf:z powerdns/pdns-recursor-${{ matrix.recursor }} + sudo docker run -d --network="host" --name=recursor --volume=$PWD/testsdata/powerdns/pdns_recursor.lua:/etc/powerdns/recursor.lua:z --volume=$PWD/testsdata/powerdns/pdns_recursor.conf:/etc/powerdns/recursor.conf:z powerdns/pdns-recursor-${{ matrix.recursor }} until (dig -p 5553 www.github.com @127.0.0.1 | grep NOERROR); do sleep 5.0; done - name: Test send query diff --git a/README.md b/README.md index 8c01c5da..96b9efeb 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,9 @@ `DNS-collector` acts as a passive high speed **aggregator, analyzer, transporter and logging** for your DNS messages, written in **Golang**. The DNS traffic can be collected and aggregated from simultaneously [sources](doc/collectors.md) like DNStap streams, network interface or log files and relays it to multiple other [listeners](doc/loggers.md) with some [transformations](doc/transformers.md) on it ([traffic filtering](doc/transformers.md#dns-filtering), [user privacy](doc/transformers.md#user-privacy), ...) and DNS protocol conversions (to [plain text](doc/configuration.md#custom-text-format), [json](doc/dnsjson.md), and more... ). Additionally, DNS-collector also support -- [`EDNS`](doc/dnsparser.md) parser +- [Extension Mechanisms for DNS (EDNS)](doc/dnsparser.md) decoding +- IPv4/v6 defragmentation and TCP reassembly - Nanoseconds in timestamps -- TCP reassembly -- IPv4 and IPv6 addresses **Overview**: diff --git a/collectors/dnstap.go b/collectors/dnstap.go index dba0a77b..48dfa554 100644 --- a/collectors/dnstap.go +++ b/collectors/dnstap.go @@ -10,6 +10,7 @@ import ( "time" "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/netlib" "github.com/dmachard/go-framestream" "github.com/dmachard/go-logger" ) @@ -228,7 +229,7 @@ func (c *Dnstap) Run() { is_tls = true } - before, actual, err := SetSock_RCVBUF(conn, c.config.Collectors.Dnstap.RcvBufSize, is_tls) + before, actual, err := netlib.SetSock_RCVBUF(conn, c.config.Collectors.Dnstap.RcvBufSize, is_tls) if err != nil { c.logger.Fatal("Unable to set SO_RCVBUF: ", err) } diff --git a/collectors/file_ingestor.go b/collectors/file_ingestor.go index 3537109e..a9744fee 100644 --- a/collectors/file_ingestor.go +++ b/collectors/file_ingestor.go @@ -12,13 +12,13 @@ import ( "time" "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/netlib" "github.com/dmachard/go-logger" framestream "github.com/farsightsec/golang-framestream" "github.com/fsnotify/fsnotify" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcapgo" - "github.com/google/gopacket/tcpassembly" ) var waitFor = 10 * time.Second @@ -158,60 +158,81 @@ func (c *FileIngestor) ProcessPcap(filePath string) { return } - reassembleChan := make(chan DnsDataStruct) - - streamFactory := &DnsStreamFactory{reassembled: reassembleChan} - streamPool := tcpassembly.NewStreamPool(streamFactory) - assembler := tcpassembly.NewAssembler(streamPool) + dnsChan := make(chan netlib.DnsPacket) + udpChan := make(chan gopacket.Packet) + tcpChan := make(chan gopacket.Packet) + fragIp4Chan := make(chan gopacket.Packet) + fragIp6Chan := make(chan gopacket.Packet) packetSource := gopacket.NewPacketSource(pcapHandler, pcapHandler.LinkType()) packetSource.DecodeOptions.Lazy = true + packetSource.NoCopy = true - done := make(chan bool) + // defrag ipv4 + go netlib.IpDefragger(fragIp4Chan, udpChan, tcpChan) + // defrag ipv6 + go netlib.IpDefragger(fragIp6Chan, udpChan, tcpChan) + // tcp assembly + go netlib.TcpAssembler(tcpChan, dnsChan, c.filterDnsPort) + // udp processor + go netlib.UdpProcessor(udpChan, dnsChan, c.filterDnsPort) go func() { nbPackets := 0 + lastReceivedTime := time.Now() for { - dnsPacket := <-reassembleChan - if len(dnsPacket.Payload) == 0 { - c.LogInfo("end of dns packet to process, exit loop") - break - } - - // prepare dns message - dm := dnsutils.DnsMessage{} - dm.Init() - - dm.NetworkInfo.Family = dnsPacket.IpLayer.EndpointType().String() - dm.NetworkInfo.QueryIp = dnsPacket.IpLayer.Src().String() - dm.NetworkInfo.ResponseIp = dnsPacket.IpLayer.Dst().String() - dm.NetworkInfo.QueryPort = dnsPacket.TransportLayer.Src().String() - dm.NetworkInfo.ResponsePort = dnsPacket.TransportLayer.Dst().String() - dm.NetworkInfo.Protocol = dnsPacket.TransportLayer.EndpointType().String() - - dm.DNS.Payload = dnsPacket.Payload - dm.DNS.Length = len(dnsPacket.Payload) - - dm.DnsTap.Identity = c.identity - dm.DnsTap.TimeSec = dnsPacket.Timestamp.Second() - dm.DnsTap.TimeNsec = int(dnsPacket.Timestamp.UnixNano()) - - // count it - nbPackets++ - - // send DNS message to DNS processor - c.dnsProcessor.GetChannel() <- dm + select { + case dnsPacket, noMore := <-dnsChan: + if !noMore { + goto end + } + lastReceivedTime = time.Now() + // prepare dns message + dm := dnsutils.DnsMessage{} + dm.Init() + + dm.NetworkInfo.Family = dnsPacket.IpLayer.EndpointType().String() + dm.NetworkInfo.QueryIp = dnsPacket.IpLayer.Src().String() + dm.NetworkInfo.ResponseIp = dnsPacket.IpLayer.Dst().String() + dm.NetworkInfo.QueryPort = dnsPacket.TransportLayer.Src().String() + dm.NetworkInfo.ResponsePort = dnsPacket.TransportLayer.Dst().String() + dm.NetworkInfo.Protocol = dnsPacket.TransportLayer.EndpointType().String() + dm.NetworkInfo.IpDefragmented = dnsPacket.IpDefragmented + dm.NetworkInfo.TcpReassembled = dnsPacket.TcpReassembled + + dm.DNS.Payload = dnsPacket.Payload + dm.DNS.Length = len(dnsPacket.Payload) + + dm.DnsTap.Identity = c.identity + dm.DnsTap.TimeSec = dnsPacket.Timestamp.Second() + dm.DnsTap.TimeNsec = int(dnsPacket.Timestamp.UnixNano()) + + // count it + nbPackets++ + + // send DNS message to DNS processor + c.dnsProcessor.GetChannel() <- dm + case <-time.After(10 * time.Second): + elapsed := time.Since(lastReceivedTime) + if elapsed >= 10*time.Second { + close(fragIp4Chan) + close(fragIp6Chan) + close(udpChan) + close(tcpChan) + close(dnsChan) + } + } } - c.LogInfo("number of [%d] packet(s) processed", nbPackets) - done <- true + end: + c.LogInfo("pcap file [%s]: %d DNS packet(s) detected", fileName, nbPackets) }() + nbPackets := 0 for { packet, err := packetSource.NextPacket() if errors.Is(err, io.EOF) { - c.LogInfo("end of packet from pcap file, exit loop") break } if err != nil { @@ -219,44 +240,47 @@ func (c *FileIngestor) ProcessPcap(filePath string) { break } - if packet.TransportLayer().LayerType() == layers.LayerTypeUDP { - p := packet.TransportLayer().(*layers.UDP) - if int(p.SrcPort) != c.filterDnsPort && int(p.DstPort) != c.filterDnsPort { - continue - } + nbPackets++ - reassembleChan <- DnsDataStruct{ - Payload: p.Payload, - IpLayer: packet.NetworkLayer().NetworkFlow(), - TransportLayer: p.TransportFlow(), - Timestamp: packet.Metadata().Timestamp, - } + // some security checks + if packet.NetworkLayer() == nil { + continue + } + if packet.TransportLayer() == nil { + continue + } + // ipv4 fragmented packet ? + if packet.NetworkLayer().LayerType() == layers.LayerTypeIPv4 { + ip4 := packet.NetworkLayer().(*layers.IPv4) + if ip4.Flags&layers.IPv4MoreFragments == 1 || ip4.FragOffset > 0 { + fragIp4Chan <- packet + continue + } } - if packet.TransportLayer().LayerType() == layers.LayerTypeTCP { - p := packet.TransportLayer().(*layers.TCP) - if int(p.SrcPort) != c.filterDnsPort && int(p.DstPort) != c.filterDnsPort { + // ipv6 fragmented packet ? + if packet.NetworkLayer().LayerType() == layers.LayerTypeIPv6 { + v6frag := packet.Layer(layers.LayerTypeIPv6Fragment) + if v6frag != nil { + fragIp6Chan <- packet continue } - assembler.AssembleWithTimestamp( - packet.NetworkLayer().NetworkFlow(), - packet.TransportLayer().(*layers.TCP), - packet.Metadata().Timestamp, - ) + } + // tcp or udp packets ? + if packet.TransportLayer().LayerType() == layers.LayerTypeUDP { + udpChan <- packet + } + if packet.TransportLayer().LayerType() == layers.LayerTypeTCP { + tcpChan <- packet } } // remove it ? - assembler.FlushAll() - c.LogInfo("processing of pcap file [%s] terminated", fileName) - - // send empty packet to stop the goroutine - // and wait - reassembleChan <- DnsDataStruct{} - <-done + //assembler.FlushAll() + c.LogInfo("pcap file [%s] processing terminated, %d packet(s) read", fileName, nbPackets) // remove it ? if c.config.Collectors.FileIngestor.DeleteAfter { diff --git a/collectors/sniffer_afpacket.go b/collectors/sniffer_afpacket.go index 7ff8c258..162d4637 100644 --- a/collectors/sniffer_afpacket.go +++ b/collectors/sniffer_afpacket.go @@ -12,10 +12,10 @@ import ( "unsafe" "github.com/dmachard/go-dnscollector/dnsutils" + "github.com/dmachard/go-dnscollector/netlib" "github.com/dmachard/go-logger" "github.com/google/gopacket" "github.com/google/gopacket/layers" - "github.com/google/gopacket/tcpassembly" "golang.org/x/net/bpf" "golang.org/x/sys/unix" ) @@ -276,62 +276,56 @@ func (c *AfpacketSniffer) Run() { dnsProcessor.queryTimeout = c.config.Collectors.AfpacketLiveCapture.QueryTimeout go dnsProcessor.Run(c.Loggers()) - var eth layers.Ethernet - var ip4 layers.IPv4 - var ip6 layers.IPv6 - var ipFlow gopacket.Flow - var tcp layers.TCP - var udp layers.UDP - parserLayers := gopacket.NewDecodingLayerParser(layers.LayerTypeEthernet, ð, &ip4, &ip6, &tcp, &udp) - decodedLayers := make([]gopacket.LayerType, 0, 4) + dnsChan := make(chan netlib.DnsPacket) + udpChan := make(chan gopacket.Packet) + tcpChan := make(chan gopacket.Packet) + fragIp4Chan := make(chan gopacket.Packet) + fragIp6Chan := make(chan gopacket.Packet) - reassembleChan := make(chan DnsDataStruct) + netDecoder := &netlib.NetDecoder{} - streamFactory := &DnsStreamFactory{reassembled: reassembleChan} - streamPool := tcpassembly.NewStreamPool(streamFactory) - assembler := tcpassembly.NewAssembler(streamPool) - - ticker := time.NewTicker(time.Minute * 1) + // defrag ipv4 + go netlib.IpDefragger(fragIp4Chan, udpChan, tcpChan) + // defrag ipv6 + go netlib.IpDefragger(fragIp6Chan, udpChan, tcpChan) + // tcp assembly + go netlib.TcpAssembler(tcpChan, dnsChan, 0) + // udp processor + go netlib.UdpProcessor(udpChan, dnsChan, 0) // goroutine to read all packets reassembled go func() { // prepare dns message dm := dnsutils.DnsMessage{} - for { - select { - // read packet from channel - case dnsPacket := <-reassembleChan: - // reset - dm.Init() - - dm.NetworkInfo.Family = dnsPacket.IpLayer.EndpointType().String() - dm.NetworkInfo.QueryIp = dnsPacket.IpLayer.Src().String() - dm.NetworkInfo.ResponseIp = dnsPacket.IpLayer.Dst().String() - dm.NetworkInfo.QueryPort = dnsPacket.TransportLayer.Src().String() - dm.NetworkInfo.ResponsePort = dnsPacket.TransportLayer.Dst().String() - dm.NetworkInfo.Protocol = dnsPacket.TransportLayer.EndpointType().String() - - dm.DNS.Payload = dnsPacket.Payload - dm.DNS.Length = len(dnsPacket.Payload) - - dm.DnsTap.Identity = c.identity - dm.DnsTap.TimeSec = dnsPacket.Timestamp.Second() - dm.DnsTap.TimeNsec = int(dnsPacket.Timestamp.UnixNano()) - - // send DNS message to DNS processor - dnsProcessor.GetChannel() <- dm - - case <-ticker.C: - // Every minute, flush connections that haven't seen activity in the past 2 minutes. - assembler.FlushOlderThan(time.Now().Add(time.Minute * -2)) - } + // for { + for dnsPacket := range dnsChan { + // reset + dm.Init() + + dm.NetworkInfo.Family = dnsPacket.IpLayer.EndpointType().String() + dm.NetworkInfo.QueryIp = dnsPacket.IpLayer.Src().String() + dm.NetworkInfo.ResponseIp = dnsPacket.IpLayer.Dst().String() + dm.NetworkInfo.QueryPort = dnsPacket.TransportLayer.Src().String() + dm.NetworkInfo.ResponsePort = dnsPacket.TransportLayer.Dst().String() + dm.NetworkInfo.Protocol = dnsPacket.TransportLayer.EndpointType().String() + + dm.DNS.Payload = dnsPacket.Payload + dm.DNS.Length = len(dnsPacket.Payload) + + dm.DnsTap.Identity = c.identity + dm.DnsTap.TimeSec = dnsPacket.Timestamp.Second() + dm.DnsTap.TimeNsec = int(dnsPacket.Timestamp.UnixNano()) + + // send DNS message to DNS processor + dnsProcessor.GetChannel() <- dm } }() go func() { buf := make([]byte, 65536) oob := make([]byte, 100) + for { //flags, from bufN, oobn, _, _, err := syscall.Recvmsg(c.fd, buf, oob, 0) @@ -363,37 +357,56 @@ func (c *AfpacketSniffer) Run() { nsec := binary.LittleEndian.Uint32(scm.Data[8:12]) timestamp := time.Unix(int64(tsec), int64(nsec)) - // decode layers - parserLayers.DecodeLayers(buf[:bufN], &decodedLayers) - if len(ip4.Contents) > 0 { - ipFlow = ip4.NetworkFlow() + // copy packet data from buffer + pkt := make([]byte, bufN) + copy(pkt, buf[:bufN]) + + // decode minimal layers + packet := gopacket.NewPacket(pkt, netDecoder, gopacket.NoCopy) + packet.Metadata().CaptureLength = len(packet.Data()) + packet.Metadata().Length = len(packet.Data()) + packet.Metadata().Timestamp = timestamp + + // some security checks + if packet.NetworkLayer() == nil { + continue } - if len(ip6.Contents) > 0 { - ipFlow = ip6.NetworkFlow() + if packet.TransportLayer() == nil { + continue } - for _, layerType := range decodedLayers { - if layerType == layers.LayerTypeUDP { - reassembleChan <- DnsDataStruct{ - Payload: udp.Payload, - IpLayer: ipFlow, - TransportLayer: udp.TransportFlow(), - Timestamp: timestamp, - } + // ipv4 fragmented packet ? + if packet.NetworkLayer().LayerType() == layers.LayerTypeIPv4 { + ip4 := packet.NetworkLayer().(*layers.IPv4) + if ip4.Flags&layers.IPv4MoreFragments == 1 || ip4.FragOffset > 0 { + fragIp4Chan <- packet + continue } - if layerType == layers.LayerTypeTCP { - assembler.AssembleWithTimestamp( - ipFlow, - &tcp, - timestamp, - ) + } + + // ipv6 fragmented packet ? + if packet.NetworkLayer().LayerType() == layers.LayerTypeIPv6 { + v6frag := packet.Layer(layers.LayerTypeIPv6Fragment) + if v6frag != nil { + fragIp6Chan <- packet + continue } } + + // tcp or udp packets ? + if packet.TransportLayer().LayerType() == layers.LayerTypeUDP { + udpChan <- packet + } + + if packet.TransportLayer().LayerType() == layers.LayerTypeTCP { + tcpChan <- packet + } } }() <-c.exit + close(dnsChan) // stop dns processor dnsProcessor.Stop() diff --git a/config.yml b/config.yml index 4b773020..76c58bdf 100644 --- a/config.yml +++ b/config.yml @@ -52,6 +52,8 @@ global: # - ra: recursion available # - ad: authenticated data # - edns-csubnet: client subnet + # - df: flag when ip defragemtation occured + # - tr: flag when tcp reassembled occured text-format: "timestamp-rfc3339ns identity operation rcode queryip queryport family protocol length qname qtype latency" # create your dns collector, please refer bellow to see the list diff --git a/dnsutils/message.go b/dnsutils/message.go index 4bd52643..50594a8b 100644 --- a/dnsutils/message.go +++ b/dnsutils/message.go @@ -77,12 +77,14 @@ type DnsGeo struct { } type DnsNetInfo struct { - Family string `json:"family" msgpack:"family"` - Protocol string `json:"protocol" msgpack:"protocol"` - QueryIp string `json:"query-ip" msgpack:"query-ip"` - QueryPort string `json:"query-port" msgpack:"query-port"` - ResponseIp string `json:"response-ip" msgpack:"response-ip"` - ResponsePort string `json:"response-port" msgpack:"response-port"` + Family string `json:"family" msgpack:"family"` + Protocol string `json:"protocol" msgpack:"protocol"` + QueryIp string `json:"query-ip" msgpack:"query-ip"` + QueryPort string `json:"query-port" msgpack:"query-port"` + ResponseIp string `json:"response-ip" msgpack:"response-ip"` + ResponsePort string `json:"response-port" msgpack:"response-port"` + IpDefragmented bool `json:"ip-defragmented" msgpack:"ip-defragmented"` + TcpReassembled bool `json:"tcp-reassembled" msgpack:"tcp-reassembled"` } type DnsRRs struct { @@ -170,12 +172,14 @@ type DnsMessage struct { func (dm *DnsMessage) Init() { dm.NetworkInfo = DnsNetInfo{ - Family: "-", - Protocol: "-", - QueryIp: "-", - QueryPort: "-", - ResponseIp: "-", - ResponsePort: "-", + Family: "-", + Protocol: "-", + QueryIp: "-", + QueryPort: "-", + ResponseIp: "-", + ResponsePort: "-", + IpDefragmented: false, + TcpReassembled: false, } dm.DnsTap = DnsTap{ @@ -401,6 +405,18 @@ func (dm *DnsMessage) Bytes(format []string, delimiter string) []byte { s.WriteString(dm.DNS.Type) case directive == "opcode": s.WriteString(strconv.Itoa(dm.DNS.Opcode)) + case directive == "tr": + if dm.NetworkInfo.TcpReassembled { + s.WriteString("TR") + } else { + s.WriteString("-") + } + case directive == "df": + if dm.NetworkInfo.IpDefragmented { + s.WriteString("DF") + } else { + s.WriteString("-") + } case directive == "tc": if dm.DNS.Flags.TC { s.WriteString("TC") diff --git a/doc/configuration.md b/doc/configuration.md index 7c420d0e..5e8008ea 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -93,6 +93,8 @@ Default directives: - `aa`: flag authoritative answer - `ra`: flag recursion available - `ad`: flag authenticated data +- `df`: flag when ip defragmented occured +- `tr`: flag when tcp reassembled occured - `edns-csubnet`: display client subnet info ```yaml diff --git a/doc/metrics.txt b/doc/metrics.txt index 03fbbefa..09b09085 100644 --- a/doc/metrics.txt +++ b/doc/metrics.txt @@ -16,9 +16,9 @@ dnscollector_nxdomains_total{stream_id="dnsdist-cache"} 1 dnscollector_nxdomains_total{stream_id="dnsdist1"} 1 # HELP dnscollector_packets_total Counter of packets # TYPE dnscollector_packets_total counter -dnscollector_packets_total{flag_aa="false",flag_ad="false",flag_qr="REPLY",flag_ra="true",flag_tc="false",net_family="INET",net_transport="UDP",op_code="0",op_name="CLIENT_RESPONSE",pkt_err="false",query_type="A",return_code="NXDOMAIN",stream_id="dnsdist-cache"} 2 -dnscollector_packets_total{flag_aa="false",flag_ad="false",flag_qr="REPLY",flag_ra="true",flag_tc="false",net_family="INET",net_transport="UDP",op_code="0",op_name="CLIENT_RESPONSE",pkt_err="false",query_type="A",return_code="NXDOMAIN",stream_id="dnsdist1"} 2 -dnscollector_packets_total{flag_aa="false",flag_ad="true",flag_qr="QUERY",flag_ra="false",flag_tc="false",net_family="INET",net_transport="UDP",op_code="0",op_name="CLIENT_QUERY",pkt_err="false",query_type="A",return_code="NOERROR",stream_id="dnsdist1"} 4 +dnscollector_packets_total{flag_aa="false",flag_ad="false",flag_qr="REPLY",flag_ra="true",flag_tc="false",net_family="INET",net_transport="UDP",op_code="0",op_name="CLIENT_RESPONSE",pkt_err="false",query_type="A",return_code="NXDOMAIN",stream_id="dnsdist-cache", flag_df="false", flag_tr="false"} 2 +dnscollector_packets_total{flag_aa="false",flag_ad="false",flag_qr="REPLY",flag_ra="true",flag_tc="false",net_family="INET",net_transport="UDP",op_code="0",op_name="CLIENT_RESPONSE",pkt_err="false",query_type="A",return_code="NXDOMAIN",stream_id="dnsdist1", flag_df="false", flag_tr="false"} 2 +dnscollector_packets_total{flag_aa="false",flag_ad="true",flag_qr="QUERY",flag_ra="false",flag_tc="false",net_family="INET",net_transport="UDP",op_code="0",op_name="CLIENT_QUERY",pkt_err="false",query_type="A",return_code="NOERROR",stream_id="dnsdist1", flag_df="false", flag_tr="false"} 4 # HELP dnscollector_qnames_size_bytes Size of the qname in bytes. # TYPE dnscollector_qnames_size_bytes histogram dnscollector_qnames_size_bytes_bucket{stream_id="dnsdist-cache",le="10"} 0 diff --git a/loggers/prometheus.go b/loggers/prometheus.go index f67ea587..5570862f 100644 --- a/loggers/prometheus.go +++ b/loggers/prometheus.go @@ -265,7 +265,10 @@ func (o *Prometheus) InitProm() { "flag_aa", "flag_ra", "flag_ad", - "pkt_err"}, + "pkt_err", + "flag_df", + "flag_tr", + }, ) o.promRegistry.MustRegister(o.counterPackets) @@ -500,6 +503,8 @@ func (o *Prometheus) Record(dm dnsutils.DnsMessage) { strconv.FormatBool(dm.DNS.Flags.RA), strconv.FormatBool(dm.DNS.Flags.AD), strconv.FormatBool(dm.DNS.MalformedPacket), + strconv.FormatBool(dm.NetworkInfo.IpDefragmented), + strconv.FormatBool(dm.NetworkInfo.TcpReassembled), ).Inc() // count the number of queries and replies diff --git a/netlib/ipdefrag.go b/netlib/ipdefrag.go new file mode 100644 index 00000000..1103a6aa --- /dev/null +++ b/netlib/ipdefrag.go @@ -0,0 +1,386 @@ +package netlib + +import ( + "container/list" + "fmt" + "sync" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +const ( + IPv6MinimumFragmentSize = 1280 + IPv6MaximumSize = 65535 + IPv6MaximumFragmentOffset = 8189 + IPv6MaximumFragmentListLen = 52 + + IPv4MinimumFragmentSize = 8 // Minimum size of a single fragment + IPv4MaximumSize = 65535 // Maximum size of a fragment (2^16) + IPv4MaximumFragmentOffset = 8183 // Maximum offset of a fragment + IPv4MaximumFragmentListLen = 8192 // Back out if we get more than this many fragments +) + +type fragments struct { + List list.List + Highest uint16 + Current uint16 + LastSeen time.Time +} + +func (f *fragments) insert(in gopacket.Packet) (gopacket.Packet, error) { + var inFragOffset uint16 + var inFragLength uint16 + var inFragMore bool + + if in.NetworkLayer().LayerType() == layers.LayerTypeIPv6 { + inIp6 := in.Layer(layers.LayerTypeIPv6).(*layers.IPv6) + inFrag6 := in.Layer(layers.LayerTypeIPv6Fragment).(*layers.IPv6Fragment) + inFragOffset = inFrag6.FragmentOffset * 8 + inFragLength = inIp6.Length - 8 + inFragMore = inFrag6.MoreFragments + } + if in.NetworkLayer().LayerType() == layers.LayerTypeIPv4 { + inIp4 := in.Layer(layers.LayerTypeIPv4).(*layers.IPv4) + inFragOffset = inIp4.FragOffset * 8 + inFragLength = inIp4.Length - 20 + inFragMore = inIp4.Flags&layers.IPv4MoreFragments > 0 + } + + if inFragOffset >= f.Highest { + f.List.PushBack(in) + } else { + for e := f.List.Front(); e != nil; e = e.Next() { + packet, _ := e.Value.(gopacket.Packet) + + var fragOffset uint16 + + frag6 := packet.Layer(layers.LayerTypeIPv6Fragment).(*layers.IPv6Fragment) + ip4, _ := e.Value.(*layers.IPv4) + if frag6 != nil { + fragOffset = frag6.FragmentOffset * 8 + } else { + fragOffset = ip4.FragOffset * 8 + } + + if inFragOffset == fragOffset { + return nil, nil + } + if inFragOffset <= fragOffset { + f.List.InsertBefore(in, e) + break + } + } + } + + f.LastSeen = in.Metadata().Timestamp + + // After inserting the Fragment, we update the counters + if f.Highest < inFragOffset+inFragLength { + f.Highest = inFragOffset + inFragLength + } + f.Current = f.Current + inFragLength + + // Final Fragment ? + if !inFragMore && f.Highest == f.Current { + return f.build(in) + } + return nil, nil +} + +func (f *fragments) build(in gopacket.Packet) (gopacket.Packet, error) { + var final []byte + var currentOffset uint16 + + for e := f.List.Front(); e != nil; e = e.Next() { + pack, _ := e.Value.(gopacket.Packet) + + var fragOffset uint16 + var fragLength uint16 + var fragPayload []byte + var ipOffset uint16 + + if pack.NetworkLayer().LayerType() == layers.LayerTypeIPv6 { + frag6 := pack.Layer(layers.LayerTypeIPv6Fragment).(*layers.IPv6Fragment) + ip6 := pack.Layer(layers.LayerTypeIPv6).(*layers.IPv6) + + fragOffset = frag6.FragmentOffset + fragLength = ip6.Length + fragPayload = frag6.Payload + ipOffset = 8 + } + if pack.NetworkLayer().LayerType() == layers.LayerTypeIPv4 { + ip4 := pack.Layer(layers.LayerTypeIPv4).(*layers.IPv4) + + fragOffset = ip4.FragOffset + fragLength = ip4.Length + fragPayload = ip4.Payload + ipOffset = 20 + } + + if fragOffset*8 == currentOffset { + final = append(final, fragPayload...) + currentOffset = currentOffset + fragLength - ipOffset + + } else if fragOffset*8 < currentOffset { + startAt := currentOffset - fragOffset*8 + if startAt > fragLength-ipOffset { + return nil, fmt.Errorf("defrag: invalid fragment") + } + final = append(final, fragPayload[startAt:]...) + currentOffset = currentOffset + fragOffset*8 + + } else { + // Houston - we have an hole ! + return nil, fmt.Errorf("defrag: hole found") + } + } + + if in.NetworkLayer().LayerType() == layers.LayerTypeIPv4 { + ip4 := in.Layer(layers.LayerTypeIPv4).(*layers.IPv4) + out := &layers.IPv4{ + Version: ip4.Version, + IHL: ip4.IHL, + TOS: ip4.TOS, + Length: f.Highest, + Id: ip4.Id, + Flags: 0, + FragOffset: 0, + TTL: ip4.TTL, + Protocol: ip4.Protocol, + Checksum: 0, + SrcIP: ip4.SrcIP, + DstIP: ip4.DstIP, + Options: ip4.Options, + Padding: ip4.Padding, + } + out.Payload = final + + buf := gopacket.NewSerializeBuffer() + ops := gopacket.SerializeOptions{ + FixLengths: true, + ComputeChecksums: true, + } + + ip4Payload, _ := buf.PrependBytes(len(final)) + copy(ip4Payload, final) + out.SerializeTo(buf, ops) + + outPacket := gopacket.NewPacket(buf.Bytes(), layers.LayerTypeIPv4, gopacket.Default) + outPacket.Metadata().CaptureLength = len(outPacket.Data()) + outPacket.Metadata().Length = len(outPacket.Data()) + outPacket.Metadata().Timestamp = in.Metadata().Timestamp + + // workaround to mark the packet as reassembled + outPacket.Metadata().Truncated = true + return outPacket, nil + } + + if in.NetworkLayer().LayerType() == layers.LayerTypeIPv6 { + ip6 := in.Layer(layers.LayerTypeIPv6).(*layers.IPv6) + frag6 := in.Layer(layers.LayerTypeIPv6Fragment).(*layers.IPv6Fragment) + out := &layers.IPv6{ + Version: ip6.Version, + TrafficClass: ip6.TrafficClass, + FlowLabel: ip6.FlowLabel, + Length: f.Highest, + NextHeader: frag6.NextHeader, + HopLimit: ip6.HopLimit, + SrcIP: ip6.SrcIP, + DstIP: ip6.DstIP, + HopByHop: ip6.HopByHop, + } + out.Payload = final + + buf := gopacket.NewSerializeBuffer() + ops := gopacket.SerializeOptions{ + FixLengths: true, + ComputeChecksums: true, + } + + v6Payload, _ := buf.PrependBytes(len(final)) + copy(v6Payload, final) + + out.SerializeTo(buf, ops) + outPacket := gopacket.NewPacket(buf.Bytes(), layers.LayerTypeIPv6, gopacket.Default) + outPacket.Metadata().CaptureLength = len(outPacket.Data()) + outPacket.Metadata().Length = len(outPacket.Data()) + outPacket.Metadata().Timestamp = in.Metadata().Timestamp + + // workaround to mark the packet as reassembled + outPacket.Metadata().Truncated = true + + return outPacket, nil + } + return nil, nil +} + +type ipFlow struct { + flow gopacket.Flow + id uint32 +} + +func newIPv4(packet gopacket.Packet) ipFlow { + ip4 := packet.Layer(layers.LayerTypeIPv4).(*layers.IPv4) + return ipFlow{ + flow: ip4.NetworkFlow(), + id: uint32(ip4.Id), + } +} + +func newIPv6(packet gopacket.Packet) ipFlow { + frag := packet.Layer(layers.LayerTypeIPv6Fragment).(*layers.IPv6Fragment) + ip6 := packet.Layer(layers.LayerTypeIPv6).(*layers.IPv6) + return ipFlow{ + flow: ip6.NetworkFlow(), + id: frag.Identification, + } +} + +type IpDefragmenter struct { + sync.RWMutex + ipFlows map[ipFlow]*fragments +} + +func NewIPDefragmenter() *IpDefragmenter { + return &IpDefragmenter{ + ipFlows: make(map[ipFlow]*fragments), + } +} + +func (d *IpDefragmenter) DefragIP(in gopacket.Packet) (gopacket.Packet, error) { + // check if we need to defrag + if st := d.dontDefrag(in); st { + return in, nil + } + + // perfom security checks + if err := d.securityChecks(in); err != nil { + return nil, err + } + + // ok, got a fragment + // have we already seen a flow between src/dst with that Id? + var ipf ipFlow + var fl *fragments + var exist bool + var maxFrag int + + if in.NetworkLayer().LayerType() == layers.LayerTypeIPv4 { + ipf = newIPv4(in) + maxFrag = IPv4MaximumFragmentListLen + } + if in.NetworkLayer().LayerType() == layers.LayerTypeIPv6 { + ipf = newIPv6(in) + maxFrag = IPv6MaximumFragmentListLen + } + d.Lock() + fl, exist = d.ipFlows[ipf] + if !exist { + fl = new(fragments) + d.ipFlows[ipf] = fl + } + d.Unlock() + + // insert, and if final build it + out, err2 := fl.insert(in) + + // at last, if we hit the maximum frag list len + // without any defrag success, we just drop everything and + // raise an error + if out == nil && fl.List.Len()+1 > maxFrag { + d.flush(ipf) + return nil, fmt.Errorf("fragment List hits its maximum") + } + + // if we got a packet, it's a new one, and he is defragmented + // when defrag is done for a flow between two ip clean the list + if out != nil { + d.flush(ipf) + return out, nil + } + return nil, err2 +} + +func (d *IpDefragmenter) dontDefrag(in gopacket.Packet) bool { + if in.NetworkLayer().LayerType() == layers.LayerTypeIPv6 { + // check if we need to defrag + frag := in.Layer(layers.LayerTypeIPv6Fragment) + if frag == nil { + return true + } + } + + if in.NetworkLayer().LayerType() == layers.LayerTypeIPv4 { + ip4 := in.Layer(layers.LayerTypeIPv4).(*layers.IPv4) + // don't defrag packet with DF flag + if ip4.Flags&layers.IPv4DontFragment != 0 { + return true + } + // don't defrag not fragmented ones + if ip4.Flags&layers.IPv4MoreFragments == 0 && ip4.FragOffset == 0 { + return true + } + } + + return false +} + +func (d *IpDefragmenter) securityChecks(in gopacket.Packet) error { + if in.NetworkLayer().LayerType() == layers.LayerTypeIPv6 { + frag6 := in.Layer(layers.LayerTypeIPv6Fragment).(*layers.IPv6Fragment) + + // don't allow too big fragment offset + if frag6.FragmentOffset > IPv6MaximumFragmentOffset { + return fmt.Errorf("fragment offset too big (handcrafted? %d > %d)", frag6.FragmentOffset, IPv6MaximumFragmentOffset) + } + fragOffset := uint32(frag6.FragmentOffset * 8) + + // don't allow fragment that would oversize an IP packet + if fragOffset+uint32(len(frag6.Payload)) > IPv6MaximumSize { + return fmt.Errorf("fragment will overrun (handcrafted? %d > %d)", fragOffset+uint32(len(frag6.Payload)), IPv6MaximumFragmentOffset) + } + } + if in.NetworkLayer().LayerType() == layers.LayerTypeIPv4 { + ip4 := in.Layer(layers.LayerTypeIPv4).(*layers.IPv4) + fragSize := ip4.Length - uint16(ip4.IHL)*4 + + // don't allow small fragments outside of specification + if fragSize < IPv4MinimumFragmentSize { + return fmt.Errorf("fragment too small(handcrafted? %d < %d)", fragSize, IPv4MinimumFragmentSize) + } + + // don't allow too big fragment offset + if ip4.FragOffset > IPv4MaximumFragmentOffset { + return fmt.Errorf("fragment offset too big (handcrafted? %d > %d)", ip4.FragOffset, IPv4MaximumFragmentOffset) + } + fragOffset := ip4.FragOffset * 8 + + // don't allow fragment that would oversize an IP packet + if fragOffset+ip4.Length > IPv4MaximumSize { + return fmt.Errorf("fragment will overrun (handcrafted? %d > %d)", fragOffset+ip4.Length, IPv4MaximumSize) + } + } + + return nil +} + +func (d *IpDefragmenter) flush(ipf ipFlow) { + d.Lock() + delete(d.ipFlows, ipf) + d.Unlock() +} + +func (d *IpDefragmenter) DiscardOlderThan(t time.Time) int { + var nb int + d.Lock() + for k, v := range d.ipFlows { + if v.LastSeen.Before(t) { + nb = nb + 1 + delete(d.ipFlows, k) + } + } + d.Unlock() + return nb +} diff --git a/netlib/networkdecoder.go b/netlib/networkdecoder.go new file mode 100644 index 00000000..6d7237e0 --- /dev/null +++ b/netlib/networkdecoder.go @@ -0,0 +1,126 @@ +package netlib + +import ( + "fmt" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +type NetDecoder struct{} + +const ( + IPv4ProtocolTCP = layers.IPProtocolTCP + IPv4ProtocolUDP = layers.IPProtocolUDP + IPv6ProtocolTCP = layers.IPProtocolTCP + IPv6ProtocolUDP = layers.IPProtocolUDP + IPv6ProtocolFragment = layers.IPProtocolIPv6Fragment +) + +func (d *NetDecoder) Decode(data []byte, p gopacket.PacketBuilder) error { + // Decode the Ethernet layer + ethernetLayer := &layers.Ethernet{} + if err := ethernetLayer.DecodeFromBytes(data, p); err != nil { + return err + } + p.AddLayer(ethernetLayer) + p.SetLinkLayer(ethernetLayer) + + // Check the EtherType of the Ethernet layer to determine the next layer + switch ethernetLayer.EthernetType { + case layers.EthernetTypeIPv4: + return d.decodeIPv4(ethernetLayer.Payload, p) + case layers.EthernetTypeIPv6: + return d.decodeIPv6(ethernetLayer.Payload, p) + } + + return nil +} + +func (d *NetDecoder) decodeIPv4(data []byte, p gopacket.PacketBuilder) error { + // Decode the IPv4 layer + ipv4Layer := &layers.IPv4{} + if err := ipv4Layer.DecodeFromBytes(data, p); err != nil { + return err + } + p.AddLayer(ipv4Layer) + p.SetNetworkLayer(ipv4Layer) + + // Check the Protocol of the IPv4 layer to determine the next layer + switch ipv4Layer.Protocol { + case IPv4ProtocolTCP: + return d.decodeTCP(ipv4Layer.Payload, p) + case IPv4ProtocolUDP: + return d.decodeUDP(ipv4Layer.Payload, p) + } + + return nil +} + +func (d *NetDecoder) decodeIPv6(data []byte, p gopacket.PacketBuilder) error { + + ipv6Layer := &layers.IPv6{} + if err := ipv6Layer.DecodeFromBytes(data, p); err != nil { + return err + } + p.AddLayer(ipv6Layer) + p.SetNetworkLayer(ipv6Layer) + + // Check the NextHeader of the IPv6 layer to determine the next layer + switch ipv6Layer.NextHeader { + case IPv6ProtocolTCP: + return d.decodeTCP(ipv6Layer.Payload, p) + case IPv6ProtocolUDP: + return d.decodeUDP(ipv6Layer.Payload, p) + case IPv6ProtocolFragment: + return d.decodeIPv6Fragment(ipv6Layer.Payload, p) + } + return nil +} + +func (d *NetDecoder) decodeIPv6Fragment(data []byte, p gopacket.PacketBuilder) error { + // Create a new packet from the byte slice + packet := gopacket.NewPacket(data, layers.LayerTypeIPv6Fragment, gopacket.Default) + + ipv6FragLayer := packet.Layer(layers.LayerTypeIPv6Fragment) + if ipv6FragLayer == nil { + return fmt.Errorf("no ipv6 fragment layer") + } + + p.AddLayer(ipv6FragLayer) + + ipv6Frag := ipv6FragLayer.(*layers.IPv6Fragment) + + // This is the last fragment, so we can decode the payload + switch ipv6Frag.NextHeader { + case layers.IPProtocolTCP: + return d.decodeTCP(ipv6FragLayer.LayerPayload(), p) + case layers.IPProtocolUDP: + return d.decodeUDP(ipv6FragLayer.LayerPayload(), p) + } + return nil +} + +func (d *NetDecoder) decodeTCP(data []byte, p gopacket.PacketBuilder) error { + // Decode the TCP layer + tcpLayer := &layers.TCP{} + if err := tcpLayer.DecodeFromBytes(data, p); err != nil { + return err + } + p.AddLayer(tcpLayer) + p.SetTransportLayer(tcpLayer) + + return nil +} + +func (d *NetDecoder) decodeUDP(data []byte, p gopacket.PacketBuilder) error { + // Decode the UDP layer + udpLayer := &layers.UDP{} + if err := udpLayer.DecodeFromBytes(data, p); err != nil { + return err + } + p.AddLayer(udpLayer) + p.SetTransportLayer(udpLayer) + + return nil +} diff --git a/netlib/networkdecoder_test.go b/netlib/networkdecoder_test.go new file mode 100644 index 00000000..92b9d850 --- /dev/null +++ b/netlib/networkdecoder_test.go @@ -0,0 +1,326 @@ +package netlib + +import ( + "testing" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +func TestNetDecoder_Decode_IPv4_UDP(t *testing.T) { + pkt := []byte{ + // ethernet + 0x00, 0x0c, 0x29, 0x8a, 0x5d, 0xd7, 0x00, 0x86, 0x9c, 0xe7, 0x55, 0x14, 0x08, 0x00, + // ipv4 + 0x45, 0x00, 0x00, 0x44, 0xe5, 0x6a, 0x00, 0x00, 0x6f, 0x11, + 0xec, 0x11, 0xac, 0xd9, 0x28, 0x4c, 0xc1, 0x18, 0xe3, 0xee, + // udp + 0xdd, 0x68, 0x00, 0x35, 0x00, 0x30, 0x0c, 0x33, + // udp payload (dns) + 0xd4, 0x3f, 0x00, 0x10, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x08, 0x77, + 0x65, 0x62, 0x65, 0x72, 0x6c, 0x61, 0x62, 0x02, 0x64, 0x65, 0x00, 0x00, 0x30, 0x00, + 0x01, 0x00, 0x00, 0x29, 0x10, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, + } + + decoder := &NetDecoder{} + + packet := gopacket.NewPacket(pkt, decoder, gopacket.NoCopy) + + packetLayers := packet.Layers() + if len(packetLayers) != 3 { + t.Fatalf("Unexpected number of layers: expected 3, got %d", len(packetLayers)) + } + + if _, ok := packetLayers[0].(*layers.Ethernet); !ok { + t.Errorf("Expected Ethernet layer, got %T", packetLayers[0]) + } + if _, ok := packetLayers[1].(*layers.IPv4); !ok { + t.Errorf("Expected IPv4 layer, got %T", packetLayers[1]) + } + ip4 := packetLayers[1].(*layers.IPv4) + if ip4.Flags&layers.IPv4MoreFragments > 0 { + t.Errorf("Expected more fragment") + } + if _, ok := packetLayers[2].(*layers.UDP); !ok { + t.Errorf("Expected UDP layer, got %T", packetLayers[2]) + } +} + +func TestNetDecoder_Decode_IPv4_TCP(t *testing.T) { + pkt := []byte{ + //ethernet + 0xb0, 0xbb, 0xe5, 0xb2, 0x46, 0x4c, 0xb0, 0x35, 0x9f, 0xd4, 0x03, 0x91, 0x08, 0x00, + // ipv4 + 0x45, 0x00, 0x00, 0x69, 0xb7, 0x65, 0x40, 0x00, 0x40, 0x06, 0xbf, + 0x6e, 0xc0, 0xa8, 0x01, 0x11, 0x01, 0x01, 0x01, 0x01, + // tcp + 0x8d, 0xcd, 0x00, 0x35, 0x39, 0x4f, 0x0c, 0xbb, 0xcf, 0x72, 0x32, 0xb3, 0x80, 0x18, + 0x01, 0xf6, 0x38, 0xc2, 0x00, 0x00, 0x01, 0x01, 0x08, 0x0a, 0x09, 0x5d, 0x2c, 0x7a, 0x65, 0xe0, + 0x63, 0x90, 0x00, 0x33, 0x85, 0x9f, 0x01, 0x20, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, + 0x06, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x03, 0x63, 0x6f, 0x6d, 0x00, 0x00, 0x01, 0x00, 0x01, + 0x00, 0x00, 0x29, 0x04, 0xd0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x0a, 0x00, 0x08, 0xdf, + 0x41, 0x92, 0x72, 0x53, 0xf5, 0x1b, 0x48, + } + + decoder := &NetDecoder{} + + packet := gopacket.NewPacket(pkt, decoder, gopacket.NoCopy) + + packetLayers := packet.Layers() + if len(packetLayers) != 3 { + t.Fatalf("Unexpected number of layers: expected 3, got %d", len(packetLayers)) + } + + if _, ok := packetLayers[0].(*layers.Ethernet); !ok { + t.Errorf("Expected Ethernet layer, got %T", packetLayers[0]) + } + if _, ok := packetLayers[2].(*layers.TCP); !ok { + t.Errorf("Expected TCP layer, got %T", packetLayers[2]) + } +} + +func TestNetDecoder_Decode_IPv4_MoreFragment(t *testing.T) { + pkt := []byte{ + // ethernet + 0x00, 0x86, 0x9c, 0xe7, 0x55, 0x14, 0x00, 0x0c, 0x29, 0x8a, 0x5d, 0xd7, 0x08, 0x00, + // ipv4 + 0x45, 0x00, 0x00, 0x44, 0xd0, 0xfe, 0x20, 0x00, 0x40, 0x11, + 0x09, 0xe6, 0xc1, 0x18, 0xe3, 0xee, 0xac, 0xd9, 0x28, 0x4c, + // udp + 0x00, 0x35, 0xdd, 0x68, 0x06, 0xae, 0xb4, 0x63, 0xd4, 0x3f, 0x84, 0x10, 0x00, 0x01, + 0x00, 0x04, 0x00, 0x00, 0x00, 0x01, 0x08, 0x77, 0x65, 0x62, 0x65, 0x72, 0x6c, 0x61, 0x62, 0x02, + 0x64, 0x65, 0x00, 0x00, 0x30, 0x00, 0x01, 0xc0, 0x0c, 0x00, 0x30, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x3c, 0x02, 0x08, 0x01, 0x01, 0x03, 0x0a, 0x03, 0x01, 0x00, 0x01, 0xdd, 0xef, 0xfd, 0xed, 0x22, + 0xad, 0x76, 0x0a, 0x3b, 0x0b, 0x58, 0x10, 0x1d, 0xd5, 0x3d, 0xee, 0xf3, 0xf7, 0xda, 0xaf, 0x8b, + } + + decoder := &NetDecoder{} + + packet := gopacket.NewPacket(pkt, decoder, gopacket.NoCopy) + packetLayers := packet.Layers() + if len(packetLayers) != 3 { + t.Fatalf("Unexpected number of layers: expected 3, got %d", len(packetLayers)) + } + + if _, ok := packetLayers[0].(*layers.Ethernet); !ok { + t.Errorf("Expected Ethernet layer, got %T", packetLayers[0]) + } + if _, ok := packetLayers[1].(*layers.IPv4); !ok { + t.Errorf("Expected IPv4 layer, got %T", packetLayers[1]) + } + + ip4 := packetLayers[1].(*layers.IPv4) + if ip4.Flags&layers.IPv4MoreFragments != 1 { + t.Errorf("Expected more fragment flag") + } + if _, ok := packetLayers[2].(*layers.UDP); !ok { + t.Errorf("Expected UDP layer, got %T", packetLayers[2]) + } +} + +func TestNetDecoder_Decode_IPv4_FragmentOffset(t *testing.T) { + pkt := []byte{ + // ethernet + 0x00, 0x86, 0x9c, 0xe7, 0x55, 0x14, 0x00, 0x0c, 0x29, 0x8a, 0x5d, 0xd7, 0x08, 0x00, + // ipv4 + 0x45, 0x00, 0x00, 0xfa, 0xd0, 0xfe, 0x00, 0xb9, 0x40, 0x11, 0x2e, 0x0f, 0xc1, 0x18, 0xe3, 0xee, 0xac, 0xd9, + 0x28, 0x4c, + // udp + 0x92, 0x56, 0x69, 0x0f, 0x05, 0x4b, 0xdb, 0x48, 0x1e, 0x8f, 0xa8, 0x56, 0x36, 0x39, + 0xd5, 0xcc, 0xba, 0xf9, 0xf8, 0x22, 0x24, 0xd0, 0x76, 0xcc, 0x24, 0x9b, 0xda, 0x1d, 0x49, 0xf0, + 0x3e, 0x34, 0x44, 0x9c, 0x94, 0x65, 0x87, 0x34, 0x96, 0x0b, 0x8d, 0x1a, 0xb3, 0x33, 0xbe, 0x88, + 0x01, 0x62, 0x76, 0xf1, 0x22, 0x7b, 0x83, 0x28, 0x3d, 0x81, 0xf1, 0x21, 0x9a, 0xba, 0x6c, 0x6c, + 0xca, 0x72, 0x6e, 0x94, 0x14, 0x99, 0x4d, 0xd7, 0xbb, 0xe2, 0x49, 0xee, 0x72, 0x69, 0x3e, 0xee, + 0x0e, 0x03, 0x6c, 0xcd, 0x33, 0xc9, 0xf4, 0x43, 0xd1, 0x6d, 0xd1, 0x84, 0x3d, 0xee, 0xd0, 0xd1, + 0x5d, 0x8e, 0x2f, 0xf4, 0xce, 0x68, 0x88, 0xf3, 0x5e, 0xd5, 0x90, 0x21, 0x36, 0x1a, 0x95, 0x6f, + 0xb8, 0xbd, 0xc5, 0xf0, 0xa0, 0xc2, 0x0b, 0xe1, 0x0c, 0x62, 0x32, 0x65, 0x38, 0x7a, 0x8c, 0xf9, + 0x24, 0xc9, 0xc4, 0xfa, 0xbd, 0x64, 0x5f, 0x31, 0x25, 0xc5, 0x48, 0x4e, 0x40, 0xba, 0x11, 0x8e, + 0x82, 0x75, 0x19, 0x98, 0x99, 0x07, 0x6a, 0xbd, 0x16, 0x16, 0xcc, 0x35, 0xcf, 0x8c, 0x6b, 0x72, + 0xbb, 0x95, 0xd3, 0xd7, 0x71, 0xf5, 0x54, 0x2f, 0x08, 0x26, 0x2b, 0x0d, 0x51, 0xe8, 0x41, 0x0e, + 0xbd, 0x8f, 0x7a, 0x9a, 0x40, 0x35, 0x47, 0x57, 0x16, 0x5c, 0xaa, 0x55, 0x0e, 0xa6, 0x01, 0x12, + 0xfa, 0x52, 0x74, 0xc1, 0x4f, 0x4c, 0x5a, 0x9b, 0xb0, 0xe9, 0x9a, 0xec, 0x72, 0x70, 0xee, 0xc1, + 0x3a, 0xa9, 0x76, 0xac, 0x2e, 0xca, 0x04, 0x96, 0xf8, 0x97, 0x29, 0x20, 0xf4, 0x00, 0x00, 0x29, + 0x10, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, + } + + decoder := &NetDecoder{} + + packet := gopacket.NewPacket(pkt, decoder, gopacket.NoCopy) + packetLayers := packet.Layers() + if len(packetLayers) != 3 { + t.Fatalf("Unexpected number of layers: expected 3, got %d", len(packetLayers)) + } + + if _, ok := packetLayers[0].(*layers.Ethernet); !ok { + t.Errorf("Expected Ethernet layer, got %T", packetLayers[0]) + } + if _, ok := packetLayers[1].(*layers.IPv4); !ok { + t.Errorf("Expected IPv4 layer, got %T", packetLayers[1]) + } + + ip4 := packetLayers[1].(*layers.IPv4) + if ip4.FragOffset == 1480 { + t.Errorf("Expected fragment offset equal to 1480") + } + if ip4.Flags&layers.IPv4MoreFragments != 0 { + t.Errorf("Expected no flag for more fragment") + } + + if _, ok := packetLayers[2].(*layers.UDP); !ok { + t.Errorf("Expected UDP layer, got %T", packetLayers[2]) + } +} + +func TestNetDecoder_Decode_IPv6_UDP(t *testing.T) { + pkt := []byte{ + // ethernet + 0x00, 0x0c, 0x29, 0x8a, 0x5d, 0xd7, 0x00, 0x86, 0x9c, 0xe7, 0x55, 0x14, 0x86, 0xdd, + // ipv6 + 0x60, 0x02, 0xb8, 0xfc, 0x00, 0x42, 0x11, 0x6b, 0x2a, 0x00, 0x14, 0x50, 0x40, 0x13, 0x0c, 0x03, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x01, 0x0a, 0x20, 0x01, 0x04, 0x70, 0x76, 0x5b, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x0a, 0x25, 0x00, 0x53, + // udp + 0xb5, 0x61, 0x00, 0x35, 0x00, 0x42, 0xec, 0x92, 0xe9, 0xc4, + 0x00, 0x10, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x70, 0x61, 0x08, 0x77, 0x65, + 0x62, 0x65, 0x72, 0x6c, 0x61, 0x62, 0x02, 0x64, 0x65, 0x00, 0x00, 0x1c, 0x00, 0x01, 0x00, 0x00, + 0x29, 0x10, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x0f, 0x00, 0x08, 0x00, 0x0b, 0x00, 0x02, 0x38, + 0x00, 0x20, 0x01, 0x04, 0x70, 0x1f, 0x0b, 0x16, + } + + decoder := &NetDecoder{} + + packet := gopacket.NewPacket(pkt, decoder, gopacket.NoCopy) + + packetLayers := packet.Layers() + if len(packetLayers) != 3 { + t.Fatalf("Unexpected number of layers: expected 3, got %d", len(packetLayers)) + } + + if _, ok := packetLayers[0].(*layers.Ethernet); !ok { + t.Errorf("Expected Ethernet layer, got %T", packetLayers[0]) + } + if _, ok := packetLayers[1].(*layers.IPv6); !ok { + t.Errorf("Expected IPv6 layer, got %T", packetLayers[1]) + } + if _, ok := packetLayers[2].(*layers.UDP); !ok { + t.Errorf("Expected UDP layer, got %T", packetLayers[2]) + } +} + +func TestNetDecoder_Decode_IPv6_TCP(t *testing.T) { + pkt := []byte{ + // ethernet + 0x00, 0x0c, 0x29, 0x62, 0x31, 0x2a, 0x00, 0x0c, 0x29, 0x7c, 0xa4, 0xcb, 0x86, 0xdd, + // ipv6 + 0x60, 0x0f, 0x4e, 0xd4, 0x00, 0x56, 0x06, 0x40, 0x20, 0x01, 0x04, 0x70, 0x1f, 0x0b, 0x16, 0xb0, 0x02, 0x0c, + 0x29, 0xff, 0xfe, 0x7c, 0xa4, 0xcb, 0x20, 0x01, 0x04, 0x70, 0x1f, 0x0b, 0x16, 0xb0, 0x00, 0x00, + 0x00, 0x00, 0x0a, 0x26, 0x00, 0x53, + // tcp + 0xdf, 0x01, 0x00, 0x35, 0x21, 0xcd, 0x16, 0x09, 0x5c, 0x07, + 0xf0, 0xa9, 0x80, 0x18, 0x00, 0xbf, 0x8e, 0x81, 0x00, 0x00, 0x01, 0x01, 0x08, 0x0a, 0x84, 0x45, + 0xdf, 0x3b, 0x12, 0x7c, 0xd3, 0xd2, 0x00, 0x34, 0x80, 0xe4, 0x01, 0x20, 0x00, 0x01, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, 0x08, 0x77, 0x65, 0x62, 0x65, 0x72, 0x6c, 0x61, 0x62, 0x02, 0x64, 0x65, + 0x00, 0x00, 0x30, 0x00, 0x01, 0x00, 0x00, 0x29, 0x10, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x0c, + 0x00, 0x0a, 0x00, 0x08, 0x1b, 0x9a, 0xf6, 0x22, 0xab, 0x2c, 0x97, 0x40, + } + + decoder := &NetDecoder{} + + packet := gopacket.NewPacket(pkt, decoder, gopacket.NoCopy) + + packetLayers := packet.Layers() + if len(packetLayers) != 3 { + t.Fatalf("Unexpected number of layers: expected 3, got %d", len(packetLayers)) + } + + if _, ok := packetLayers[0].(*layers.Ethernet); !ok { + t.Errorf("Expected Ethernet layer, got %T", packetLayers[0]) + } + if _, ok := packetLayers[1].(*layers.IPv6); !ok { + t.Errorf("Expected IPv6 layer, got %T", packetLayers[1]) + } + if _, ok := packetLayers[2].(*layers.TCP); !ok { + t.Errorf("Expected TCP layer, got %T", packetLayers[2]) + } +} + +func TestNetDecoder_Decode_IPv6_Fragment(t *testing.T) { + pkt := []byte{ + // ethernet + 0x00, 0x86, 0x9c, 0xe7, 0x55, 0x14, 0x00, 0x0c, 0x29, 0x8a, 0x5d, 0xd7, 0x86, 0xdd, + // ipv6 + 0x60, 0x07, 0x87, 0xfd, 0x00, 0x28, 0x2c, 0x40, 0x20, 0x01, 0x04, 0x70, 0x76, 0x5b, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x0a, 0x25, 0x00, 0x53, 0x2a, 0x00, 0x14, 0x50, 0x40, 0x13, 0x0c, 0x03, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x01, 0x0a, + // data fragment + 0x11, 0x00, 0x00, 0x01, 0x28, 0x40, 0x3c, 0x0b, 0x00, 0x35, + 0xb5, 0x61, 0x05, 0xe5, 0x14, 0x8e, 0xe9, 0xc4, 0x84, 0x10, 0x00, 0x01, 0x00, 0x02, 0x00, 0x03, + 0x00, 0x09, 0x02, 0x70, 0x61, 0x08, 0x77, 0x65, 0x62, 0x65, 0x72, 0x6c, 0x61, 0x62, 0x02, 0x64, + 0x65, 0x00, 0x00, 0x1c, 0x00, 0x01, 0xc0, 0x0c, 0x00, 0x1c, 0x00, 0x01, 0x00, 0x00, 0x00, 0x3c, + 0x00, 0x10, 0x20, 0x01, 0x04, 0x70, 0x1f, 0x0b, 0x10, 0x24, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x02, 0xc0, 0x0c, 0x00, 0x2e, 0x00, 0x01, 0x00, 0x00, 0x00, 0x3c, 0x01, 0x1f, 0x00, 0x1c, + 0x0a, 0x03, 0x00, 0x00, 0x00, 0x3c, 0x5d, 0x06, 0x59, 0xfc, 0x5c, 0xde, 0xbe, 0xec, 0x90, 0x47, + 0x08, 0x77, 0x65, 0x62, 0x65, 0x72, 0x6c, 0x61, 0x62, 0x02, 0x64, 0x65, 0x00, 0xb5, 0xa6, 0x75, + 0xcd, 0xf5, 0xa2, 0x41, 0xe3, 0xbc, 0x5c, 0x12, 0x5d, 0x2d, 0xf9, 0x1c, 0x89, 0x3e, 0xbf, 0xe9, + } + + decoder := &NetDecoder{} + + packet := gopacket.NewPacket(pkt, decoder, gopacket.NoCopy) + + packetLayers := packet.Layers() + if len(packetLayers) != 4 { + t.Fatalf("Unexpected number of layers: expected 4, got %d", len(packetLayers)) + } + + if _, ok := packetLayers[0].(*layers.Ethernet); !ok { + t.Errorf("Expected Ethernet layer, got %T", packetLayers[0]) + } + if _, ok := packetLayers[1].(*layers.IPv6); !ok { + t.Errorf("Expected IPv6 layer, got %T", packetLayers[1]) + } + if _, ok := packetLayers[2].(*layers.IPv6Fragment); !ok { + t.Errorf("Expected IPv6 framgment layer, got %T", packetLayers[2]) + } + if _, ok := packetLayers[3].(*layers.UDP); !ok { + t.Errorf("Expected UDP layer, got %T", packetLayers[3]) + } +} + +func TestNetDecoder_Decode_IPv6_EndFragment(t *testing.T) { + pkt := []byte{ + // ethernet + 0x00, 0x86, 0x9c, 0xe7, 0x55, 0x14, 0x00, 0x0c, 0x29, 0x8a, 0x5d, 0xd7, 0x86, 0xdd, + // ipv6 + 0x60, 0x07, 0x87, 0xfd, 0x00, 0x45, 0x2c, 0x40, 0x20, 0x01, 0x04, 0x70, 0x76, 0x5b, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x0a, 0x25, 0x00, 0x53, 0x2a, 0x00, 0x14, 0x50, 0x40, 0x13, 0x0c, 0x03, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x01, 0x0a, 0x11, 0x00, 0x05, 0xa8, 0x28, 0x40, 0x3c, 0x0b, + // udp payload + 0x5d, 0x7a, 0xb6, 0x6a, 0x1c, 0xea, 0x61, 0x8d, 0x79, 0x65, 0x32, 0x4f, 0x2c, 0x1e, 0xcc, 0x06, 0x91, 0x26, + 0x9a, 0x0e, 0x84, 0x7f, 0x00, 0xbf, 0x5b, 0xa9, 0x29, 0xc8, 0x49, 0x05, 0xca, 0x72, 0x79, 0xec, + 0xe6, 0x00, 0x00, 0x29, 0x10, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x0f, 0x00, 0x08, 0x00, 0x0b, + 0x00, 0x02, 0x38, 0x00, 0x20, 0x01, 0x04, 0x70, 0x1f, 0x0b, 0x16, + } + + decoder := &NetDecoder{} + + packet := gopacket.NewPacket(pkt, decoder, gopacket.NoCopy) + + packetLayers := packet.Layers() + if len(packetLayers) != 4 { + t.Fatalf("Unexpected number of layers: expected 4, got %d", len(packetLayers)) + } + + if _, ok := packetLayers[0].(*layers.Ethernet); !ok { + t.Errorf("Expected Ethernet layer, got %T", packetLayers[0]) + } + if _, ok := packetLayers[1].(*layers.IPv6); !ok { + t.Errorf("Expected IPv6 layer, got %T", packetLayers[1]) + } + if _, ok := packetLayers[2].(*layers.IPv6Fragment); !ok { + t.Errorf("Expected IPv6 framgment layer, got %T", packetLayers[2]) + } + if _, ok := packetLayers[3].(*layers.UDP); !ok { + t.Errorf("Expected UDP layer, got %T", packetLayers[3]) + } +} diff --git a/netlib/packetproccesor.go b/netlib/packetproccesor.go new file mode 100644 index 00000000..5be29664 --- /dev/null +++ b/netlib/packetproccesor.go @@ -0,0 +1,106 @@ +package netlib + +import ( + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/tcpassembly" +) + +// DefragPacket is a struct that holds DNS data +type DnsPacket struct { + // DNS payload + Payload []byte + // IP layer + IpLayer gopacket.Flow + // Transport layer + TransportLayer gopacket.Flow + // Timestamp + Timestamp time.Time + // IP Defragmented + IpDefragmented bool + // TCP reassembly + TcpReassembled bool +} + +func UdpProcessor(udpInput chan gopacket.Packet, dnsOutput chan DnsPacket, portFilter int) { + for packet := range udpInput { + p := packet.TransportLayer().(*layers.UDP) + + if portFilter > 0 { + if int(p.SrcPort) != portFilter && int(p.DstPort) != portFilter { + continue + } + } + + dnsOutput <- DnsPacket{ + Payload: p.Payload, + IpLayer: packet.NetworkLayer().NetworkFlow(), + TransportLayer: p.TransportFlow(), + Timestamp: packet.Metadata().Timestamp, + TcpReassembled: false, + IpDefragmented: packet.Metadata().Truncated, + } + } +} + +func TcpAssembler(tcpInput chan gopacket.Packet, dnsOutput chan DnsPacket, portFilter int) { + streamFactory := &DnsStreamFactory{Reassembled: dnsOutput} + streamPool := tcpassembly.NewStreamPool(streamFactory) + assembler := tcpassembly.NewAssembler(streamPool) + + ticker := time.NewTicker(time.Minute * 1) + + for { + select { + case packet, more := <-tcpInput: + if !more { + goto FLUSHALL + } + p := packet.TransportLayer().(*layers.TCP) + + // ip fragments should not happened with tcp ... + if packet.Metadata().Truncated { + streamFactory.IpDefragmented = packet.Metadata().Truncated + } + + // ignore packet ? + if portFilter > 0 { + if int(p.SrcPort) != portFilter && int(p.DstPort) != portFilter { + continue + } + } + + assembler.AssembleWithTimestamp( + packet.NetworkLayer().NetworkFlow(), + packet.TransportLayer().(*layers.TCP), + packet.Metadata().Timestamp, + ) + case <-ticker.C: + // Every minute, flush connections that haven't seen activity in the past 2 minutes. + assembler.FlushOlderThan(time.Now().Add(time.Minute * -2)) + } + } +FLUSHALL: + assembler.FlushAll() +} + +func IpDefragger(ipInput chan gopacket.Packet, udpOutput chan gopacket.Packet, tcpOutput chan gopacket.Packet) { + defragger := NewIPDefragmenter() + for fragment := range ipInput { + reassembled, err := defragger.DefragIP(fragment) + if err != nil { + break + } else if reassembled == nil { + continue + } else { + if reassembled.TransportLayer() != nil && reassembled.TransportLayer().LayerType() == layers.LayerTypeUDP { + udpOutput <- reassembled + } + if reassembled.TransportLayer() != nil && reassembled.TransportLayer().LayerType() == layers.LayerTypeTCP { + tcpOutput <- reassembled + } + } + } +} diff --git a/netlib/packetprocessor_test.go b/netlib/packetprocessor_test.go new file mode 100644 index 00000000..5dca9eff --- /dev/null +++ b/netlib/packetprocessor_test.go @@ -0,0 +1,110 @@ +package netlib + +import ( + "os" + "testing" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcapgo" +) + +func Test_IpDefrag(t *testing.T) { + tests := []struct { + name string + pcapFile string + nbPackets int + }{ + { + name: "DNS UDP with IPv4 Fragmented", + pcapFile: "./../testsdata/pcap/dnsdump_ip4_fragmented+udp.pcap", + nbPackets: 2, + }, + + { + name: "DNS UDP with IPv6 Fragmented", + pcapFile: "./../testsdata/pcap/dnsdump_ip6_fragmented+udp.pcap", + nbPackets: 2, + }, + } + + done := make(chan bool) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + f, err := os.Open(tc.pcapFile) + if err != nil { + t.Errorf("unable to open file: %s", err) + return + } + defer f.Close() + + pcapHandler, err := pcapgo.NewReader(f) + if err != nil { + t.Errorf("unable to open pcap file: %s", err) + return + } + + fragIp4Chan := make(chan gopacket.Packet) + fragIp6Chan := make(chan gopacket.Packet) + outputChan := make(chan gopacket.Packet, 2) + + // defrag ipv4 + go IpDefragger(fragIp4Chan, outputChan, outputChan) + // defrag ipv6 + go IpDefragger(fragIp6Chan, outputChan, outputChan) + + packetSource := gopacket.NewPacketSource(pcapHandler, pcapHandler.LinkType()) + packetSource.DecodeOptions.Lazy = true + + nbPackets := 0 + timeout := time.After(1 * time.Second) + go func() { + + for { + select { + case <-outputChan: + nbPackets++ + case <-timeout: + goto STOP + } + } + STOP: + done <- true + }() + + for { + packet, err := packetSource.NextPacket() + if err != nil { + break + } + + // ipv4 fragmented packet ? + if packet.NetworkLayer().LayerType() == layers.LayerTypeIPv4 { + ip4 := packet.NetworkLayer().(*layers.IPv4) + if ip4.Flags&layers.IPv4MoreFragments == 1 || ip4.FragOffset > 0 { + fragIp4Chan <- packet + } else { + outputChan <- packet + } + } + + if packet.NetworkLayer().LayerType() == layers.LayerTypeIPv6 { + v6frag := packet.Layer(layers.LayerTypeIPv6Fragment) + if v6frag != nil { + fragIp6Chan <- packet + } else { + outputChan <- packet + } + } + + } + + <-done + + if nbPackets != tc.nbPackets { + t.Errorf("bad number of packets, wants: %d, got: %d", tc.nbPackets, nbPackets) + } + }) + } +} diff --git a/collectors/setsock.go b/netlib/setsock.go similarity index 98% rename from collectors/setsock.go rename to netlib/setsock.go index 18905246..1f90122f 100644 --- a/collectors/setsock.go +++ b/netlib/setsock.go @@ -1,7 +1,7 @@ //go:build linux || darwin // +build linux darwin -package collectors +package netlib import ( "crypto/tls" diff --git a/collectors/setsock_windows.go b/netlib/setsock_windows.go similarity index 98% rename from collectors/setsock_windows.go rename to netlib/setsock_windows.go index 67aaa095..f6ee81d1 100644 --- a/collectors/setsock_windows.go +++ b/netlib/setsock_windows.go @@ -1,7 +1,7 @@ //go:build windows // +build windows -package collectors +package netlib import ( "crypto/tls" diff --git a/collectors/tcppacket_reader.go b/netlib/tcpassembly.go similarity index 75% rename from collectors/tcppacket_reader.go rename to netlib/tcpassembly.go index 7bd5f09c..9f62642b 100644 --- a/collectors/tcppacket_reader.go +++ b/netlib/tcpassembly.go @@ -1,4 +1,4 @@ -package collectors +package netlib import ( "bytes" @@ -9,29 +9,19 @@ import ( "github.com/google/gopacket/tcpassembly" ) -// DnsDataStruct is a struct that holds DNS data -type DnsDataStruct struct { - // DNS payload - Payload []byte - // IP layer - IpLayer gopacket.Flow - // Transport layer - TransportLayer gopacket.Flow - // Timestamp - Timestamp time.Time -} - type DnsStreamFactory struct { // Channel to send reassembled DNS data - reassembled chan DnsDataStruct + Reassembled chan DnsPacket + IpDefragmented bool } func (s *DnsStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream { return &stream{ - net: net, - transport: transport, - data: make([]byte, 0), - reassembled: s.reassembled, + net: net, + transport: transport, + data: make([]byte, 0), + reassembled: s.Reassembled, + ipDefragmented: s.IpDefragmented, } } @@ -40,7 +30,9 @@ type stream struct { data []byte lenDns int LastSeen time.Time - reassembled chan DnsDataStruct + reassembled chan DnsPacket + tcpReassembled bool + ipDefragmented bool } func (s *stream) Reassembled(rs []tcpassembly.Reassembly) { @@ -66,24 +58,28 @@ func (s *stream) Reassembled(rs []tcpassembly.Reassembly) { // Convert the length of the DNS message from the buffer to a uint s.lenDns = int(uint(lenBuf[0])<<8 | uint(lenBuf[1])) - + s.tcpReassembled = false } if len(s.data) == s.lenDns+2 { s.LastSeen = r.Seen // send the reassembled data to the channel - s.reassembled <- DnsDataStruct{ + s.reassembled <- DnsPacket{ Payload: s.data[2 : s.lenDns+2], IpLayer: s.net, TransportLayer: s.transport, Timestamp: s.LastSeen, + IpDefragmented: s.ipDefragmented, + TcpReassembled: s.tcpReassembled, } //Reset the buffer. s.data = s.data[s.lenDns+2:] s.lenDns = 0 + } else { + s.tcpReassembled = true } } } diff --git a/collectors/tcppacket_reader_test.go b/netlib/tcpassembly_test.go similarity index 91% rename from collectors/tcppacket_reader_test.go rename to netlib/tcpassembly_test.go index 1bf154de..ebf09010 100644 --- a/collectors/tcppacket_reader_test.go +++ b/netlib/tcpassembly_test.go @@ -1,4 +1,4 @@ -package collectors +package netlib import ( "os" @@ -10,7 +10,7 @@ import ( "github.com/google/gopacket/tcpassembly" ) -func Test_TcpPacketReader(t *testing.T) { +func Test_TcpAssembly(t *testing.T) { tests := []struct { name string pcapFile string @@ -63,8 +63,8 @@ func Test_TcpPacketReader(t *testing.T) { return } - reassembleChan := make(chan DnsDataStruct) - streamFactory := &DnsStreamFactory{reassembled: reassembleChan} + reassembleChan := make(chan DnsPacket) + streamFactory := &DnsStreamFactory{Reassembled: reassembleChan} streamPool := tcpassembly.NewStreamPool(streamFactory) assembler := tcpassembly.NewAssembler(streamPool) @@ -92,7 +92,7 @@ func Test_TcpPacketReader(t *testing.T) { if packet.TransportLayer().LayerType() == layers.LayerTypeUDP { p := packet.TransportLayer().(*layers.UDP) - reassembleChan <- DnsDataStruct{ + reassembleChan <- DnsPacket{ Payload: p.Payload, IpLayer: packet.NetworkLayer().NetworkFlow(), TransportLayer: p.TransportFlow(), @@ -108,7 +108,7 @@ func Test_TcpPacketReader(t *testing.T) { } } // send empty packet to stop the goroutine - reassembleChan <- DnsDataStruct{} + reassembleChan <- DnsPacket{} <-done if nbPackets != tc.nbPackets { diff --git a/testsdata/coredns_tcp.conf b/testsdata/coredns/coredns_tcp.conf similarity index 100% rename from testsdata/coredns_tcp.conf rename to testsdata/coredns/coredns_tcp.conf diff --git a/testsdata/coredns_unix.conf b/testsdata/coredns/coredns_unix.conf similarity index 100% rename from testsdata/coredns_unix.conf rename to testsdata/coredns/coredns_unix.conf diff --git a/testsdata/knotresolver_unix.conf b/testsdata/knotresolver/knotresolver_unix.conf similarity index 100% rename from testsdata/knotresolver_unix.conf rename to testsdata/knotresolver/knotresolver_unix.conf diff --git a/testsdata/pcap/dnsdump_ip4_fragmented+udp.pcap b/testsdata/pcap/dnsdump_ip4_fragmented+udp.pcap new file mode 100644 index 00000000..f204a697 Binary files /dev/null and b/testsdata/pcap/dnsdump_ip4_fragmented+udp.pcap differ diff --git a/testsdata/pcap/dnsdump_ip6_fragmented+udp.pcap b/testsdata/pcap/dnsdump_ip6_fragmented+udp.pcap new file mode 100644 index 00000000..86a441ab Binary files /dev/null and b/testsdata/pcap/dnsdump_ip6_fragmented+udp.pcap differ diff --git a/testsdata/dnsdist_dnstaptcp.conf b/testsdata/powerdns/dnsdist_dnstaptcp.conf similarity index 100% rename from testsdata/dnsdist_dnstaptcp.conf rename to testsdata/powerdns/dnsdist_dnstaptcp.conf diff --git a/testsdata/dnsdist_dnstapunix.conf b/testsdata/powerdns/dnsdist_dnstapunix.conf similarity index 100% rename from testsdata/dnsdist_dnstapunix.conf rename to testsdata/powerdns/dnsdist_dnstapunix.conf diff --git a/testsdata/dnsdist_protobuf.conf b/testsdata/powerdns/dnsdist_protobuf.conf similarity index 100% rename from testsdata/dnsdist_protobuf.conf rename to testsdata/powerdns/dnsdist_protobuf.conf diff --git a/testsdata/dnsdist_protobuf_metadata.conf b/testsdata/powerdns/dnsdist_protobuf_metadata.conf similarity index 100% rename from testsdata/dnsdist_protobuf_metadata.conf rename to testsdata/powerdns/dnsdist_protobuf_metadata.conf diff --git a/testsdata/pdns_recursor.conf b/testsdata/powerdns/pdns_recursor.conf similarity index 100% rename from testsdata/pdns_recursor.conf rename to testsdata/powerdns/pdns_recursor.conf diff --git a/testsdata/pdns_recursor.lua b/testsdata/powerdns/pdns_recursor.lua similarity index 100% rename from testsdata/pdns_recursor.lua rename to testsdata/powerdns/pdns_recursor.lua diff --git a/testsdata/unbound_tcp.conf b/testsdata/unbound/unbound_tcp.conf similarity index 100% rename from testsdata/unbound_tcp.conf rename to testsdata/unbound/unbound_tcp.conf