Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ip defrag and tcp assembly #241

Merged
merged 15 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions .github/workflows/testing-dnstap.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ jobs:
with:
go-version: ${{ matrix.go-version }}
- uses: actions/setup-python@v4
with:
python-version: 3.11

- name: build binary
run: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o go-dnscollector *.go

- name: Deploy docker image
run: |
sudo docker run -d --network="host" --name=unbound --volume=$PWD/testsdata/unbound_${{ matrix.mode }}.conf:/opt/unbound/etc/unbound/unbound.conf:z -v /tmp/:/opt/unbound/etc/unbound/tmp/:z mvance/unbound:${{ matrix.unbound }}
sudo docker run -d --network="host" --name=unbound --volume=$PWD/testsdata/unbound/unbound_${{ matrix.mode }}.conf:/opt/unbound/etc/unbound/unbound.conf:z -v /tmp/:/opt/unbound/etc/unbound/tmp/:z mvance/unbound:${{ matrix.unbound }}
until (dig -p 5553 www.github.com @127.0.0.1 | grep NOERROR); do sleep 5.0; done

- name: Test ${{ matrix.mode }}
Expand All @@ -57,13 +59,15 @@ jobs:
with:
go-version: ${{ matrix.go-version }}
- uses: actions/setup-python@v4
with:
python-version: 3.11

- name: build binary
run: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o go-dnscollector *.go

- name: Deploy coredns docker image
run: |
sudo docker run -d --network="host" --name=coredns -v $PWD/testsdata/:$PWD/testsdata/ -v /tmp/:/tmp/ coredns/coredns:${{ matrix.coredns }} -conf $PWD/testsdata/coredns_${{ matrix.mode }}.conf
sudo docker run -d --network="host" --name=coredns -v $PWD/testsdata/:$PWD/testsdata/ -v /tmp/:/tmp/ coredns/coredns:${{ matrix.coredns }} -conf $PWD/testsdata/coredns/coredns_${{ matrix.mode }}.conf
until (dig -p 5553 www.github.com @127.0.0.1 | grep NOERROR); do sleep 5.0; done

- name: Test ${{ matrix.mode }}
Expand All @@ -90,6 +94,8 @@ jobs:
with:
go-version: ${{ matrix.go-version }}
- uses: actions/setup-python@v4
with:
python-version: 3.11

- name: build binary
run: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o go-dnscollector *.go
Expand All @@ -101,7 +107,7 @@ jobs:

- name: Deploy dnsdist docker image
run: |
sudo docker run -d --network="host" --name=dnsdist --volume=$PWD/testsdata/dnsdist_${{ matrix.mode }}.conf:/etc/dnsdist/conf.d/dnsdist.conf:z -v /tmp/:/tmp/ powerdns/dnsdist-${{ matrix.dnsdist }}
sudo docker run -d --network="host" --name=dnsdist --volume=$PWD/testsdata/powerdns/dnsdist_${{ matrix.mode }}.conf:/etc/dnsdist/conf.d/dnsdist.conf:z -v /tmp/:/tmp/ powerdns/dnsdist-${{ matrix.dnsdist }}
until (dig -p 5553 www.github.com @127.0.0.1 | grep NOERROR); do sleep 5.0; done

- name: Test ${{ matrix.mode }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/testing-go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
matrix:
os-version: ['ubuntu-22.04', 'macos-12']
go-version: ['1.19', '1.20']
package: ['dnsutils', 'collectors', 'loggers', 'transformers']
package: ['dnsutils', 'collectors', 'loggers', 'transformers', 'netlib']

runs-on: ${{ matrix.os-version }}

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/testing-powerdns.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:

- name: Deploy dnsdist docker image
run: |
sudo docker run -d --network="host" --name=dnsdist --volume=$PWD/testsdata/dnsdist_protobuf.conf:/etc/dnsdist/conf.d/dnsdist.conf:z -v /tmp/:/tmp/ powerdns/dnsdist-${{ matrix.dnsdist }}
sudo docker run -d --network="host" --name=dnsdist --volume=$PWD/testsdata/powerdns/dnsdist_protobuf.conf:/etc/dnsdist/conf.d/dnsdist.conf:z -v /tmp/:/tmp/ powerdns/dnsdist-${{ matrix.dnsdist }}
until (dig -p 5553 www.github.com @127.0.0.1 | grep NOERROR); do sleep 5.0; done

- name: Test dns query
Expand Down Expand Up @@ -60,7 +60,7 @@ jobs:

- name: Deploy recursor docker image
run: |
sudo docker run -d --network="host" --name=recursor --volume=$PWD/testsdata/pdns_recursor.lua:/etc/powerdns/recursor.lua:z --volume=$PWD/testsdata/pdns_recursor.conf:/etc/powerdns/recursor.conf:z powerdns/pdns-recursor-${{ matrix.recursor }}
sudo docker run -d --network="host" --name=recursor --volume=$PWD/testsdata/powerdns/pdns_recursor.lua:/etc/powerdns/recursor.lua:z --volume=$PWD/testsdata/powerdns/pdns_recursor.conf:/etc/powerdns/recursor.conf:z powerdns/pdns-recursor-${{ matrix.recursor }}
until (dig -p 5553 www.github.com @127.0.0.1 | grep NOERROR); do sleep 5.0; done

- name: Test send query
Expand Down
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
`DNS-collector` acts as a passive high speed **aggregator, analyzer, transporter and logging** for your DNS messages, written in **Golang**. The DNS traffic can be collected and aggregated from simultaneously [sources](doc/collectors.md) like DNStap streams, network interface or log files and relays it to multiple other [listeners](doc/loggers.md) with some [transformations](doc/transformers.md) on it ([traffic filtering](doc/transformers.md#dns-filtering), [user privacy](doc/transformers.md#user-privacy), ...) and DNS protocol conversions (to [plain text](doc/configuration.md#custom-text-format), [json](doc/dnsjson.md), and more... ).

Additionally, DNS-collector also support
- [`EDNS`](doc/dnsparser.md) parser
- [Extension Mechanisms for DNS (EDNS)](doc/dnsparser.md) decoding
- IPv4/v6 defragmentation and TCP reassembly
- Nanoseconds in timestamps
- TCP reassembly
- IPv4 and IPv6 addresses

**Overview**:

Expand Down
3 changes: 2 additions & 1 deletion collectors/dnstap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/netlib"
"github.com/dmachard/go-framestream"
"github.com/dmachard/go-logger"
)
Expand Down Expand Up @@ -228,7 +229,7 @@ func (c *Dnstap) Run() {
is_tls = true
}

before, actual, err := SetSock_RCVBUF(conn, c.config.Collectors.Dnstap.RcvBufSize, is_tls)
before, actual, err := netlib.SetSock_RCVBUF(conn, c.config.Collectors.Dnstap.RcvBufSize, is_tls)
if err != nil {
c.logger.Fatal("Unable to set SO_RCVBUF: ", err)
}
Expand Down
154 changes: 89 additions & 65 deletions collectors/file_ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
"github.com/dmachard/go-dnscollector/netlib"
"github.com/dmachard/go-logger"
framestream "github.com/farsightsec/golang-framestream"
"github.com/fsnotify/fsnotify"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcapgo"
"github.com/google/gopacket/tcpassembly"
)

var waitFor = 10 * time.Second
Expand Down Expand Up @@ -158,105 +158,129 @@ func (c *FileIngestor) ProcessPcap(filePath string) {
return
}

reassembleChan := make(chan DnsDataStruct)

streamFactory := &DnsStreamFactory{reassembled: reassembleChan}
streamPool := tcpassembly.NewStreamPool(streamFactory)
assembler := tcpassembly.NewAssembler(streamPool)
dnsChan := make(chan netlib.DnsPacket)
udpChan := make(chan gopacket.Packet)
tcpChan := make(chan gopacket.Packet)
fragIp4Chan := make(chan gopacket.Packet)
fragIp6Chan := make(chan gopacket.Packet)

packetSource := gopacket.NewPacketSource(pcapHandler, pcapHandler.LinkType())
packetSource.DecodeOptions.Lazy = true
packetSource.NoCopy = true

done := make(chan bool)
// defrag ipv4
go netlib.IpDefragger(fragIp4Chan, udpChan, tcpChan)
// defrag ipv6
go netlib.IpDefragger(fragIp6Chan, udpChan, tcpChan)
// tcp assembly
go netlib.TcpAssembler(tcpChan, dnsChan, c.filterDnsPort)
// udp processor
go netlib.UdpProcessor(udpChan, dnsChan, c.filterDnsPort)

go func() {
nbPackets := 0
lastReceivedTime := time.Now()
for {
dnsPacket := <-reassembleChan
if len(dnsPacket.Payload) == 0 {
c.LogInfo("end of dns packet to process, exit loop")
break
}

// prepare dns message
dm := dnsutils.DnsMessage{}
dm.Init()

dm.NetworkInfo.Family = dnsPacket.IpLayer.EndpointType().String()
dm.NetworkInfo.QueryIp = dnsPacket.IpLayer.Src().String()
dm.NetworkInfo.ResponseIp = dnsPacket.IpLayer.Dst().String()
dm.NetworkInfo.QueryPort = dnsPacket.TransportLayer.Src().String()
dm.NetworkInfo.ResponsePort = dnsPacket.TransportLayer.Dst().String()
dm.NetworkInfo.Protocol = dnsPacket.TransportLayer.EndpointType().String()

dm.DNS.Payload = dnsPacket.Payload
dm.DNS.Length = len(dnsPacket.Payload)

dm.DnsTap.Identity = c.identity
dm.DnsTap.TimeSec = dnsPacket.Timestamp.Second()
dm.DnsTap.TimeNsec = int(dnsPacket.Timestamp.UnixNano())

// count it
nbPackets++

// send DNS message to DNS processor
c.dnsProcessor.GetChannel() <- dm
select {
case dnsPacket, noMore := <-dnsChan:
if !noMore {
goto end
}

lastReceivedTime = time.Now()
// prepare dns message
dm := dnsutils.DnsMessage{}
dm.Init()

dm.NetworkInfo.Family = dnsPacket.IpLayer.EndpointType().String()
dm.NetworkInfo.QueryIp = dnsPacket.IpLayer.Src().String()
dm.NetworkInfo.ResponseIp = dnsPacket.IpLayer.Dst().String()
dm.NetworkInfo.QueryPort = dnsPacket.TransportLayer.Src().String()
dm.NetworkInfo.ResponsePort = dnsPacket.TransportLayer.Dst().String()
dm.NetworkInfo.Protocol = dnsPacket.TransportLayer.EndpointType().String()
dm.NetworkInfo.IpDefragmented = dnsPacket.IpDefragmented
dm.NetworkInfo.TcpReassembled = dnsPacket.TcpReassembled

dm.DNS.Payload = dnsPacket.Payload
dm.DNS.Length = len(dnsPacket.Payload)

dm.DnsTap.Identity = c.identity
dm.DnsTap.TimeSec = dnsPacket.Timestamp.Second()
dm.DnsTap.TimeNsec = int(dnsPacket.Timestamp.UnixNano())

// count it
nbPackets++

// send DNS message to DNS processor
c.dnsProcessor.GetChannel() <- dm
case <-time.After(10 * time.Second):
elapsed := time.Since(lastReceivedTime)
if elapsed >= 10*time.Second {
close(fragIp4Chan)
close(fragIp6Chan)
close(udpChan)
close(tcpChan)
close(dnsChan)
}
}
}
c.LogInfo("number of [%d] packet(s) processed", nbPackets)
done <- true
end:
c.LogInfo("pcap file [%s]: %d DNS packet(s) detected", fileName, nbPackets)
}()

nbPackets := 0
for {
packet, err := packetSource.NextPacket()

if errors.Is(err, io.EOF) {
c.LogInfo("end of packet from pcap file, exit loop")
break
}
if err != nil {
c.LogError("unable to read packet: %s", err)
break
}

if packet.TransportLayer().LayerType() == layers.LayerTypeUDP {
p := packet.TransportLayer().(*layers.UDP)
if int(p.SrcPort) != c.filterDnsPort && int(p.DstPort) != c.filterDnsPort {
continue
}
nbPackets++

reassembleChan <- DnsDataStruct{
Payload: p.Payload,
IpLayer: packet.NetworkLayer().NetworkFlow(),
TransportLayer: p.TransportFlow(),
Timestamp: packet.Metadata().Timestamp,
}
// some security checks
if packet.NetworkLayer() == nil {
continue
}
if packet.TransportLayer() == nil {
continue
}

// ipv4 fragmented packet ?
if packet.NetworkLayer().LayerType() == layers.LayerTypeIPv4 {
ip4 := packet.NetworkLayer().(*layers.IPv4)
if ip4.Flags&layers.IPv4MoreFragments == 1 || ip4.FragOffset > 0 {
fragIp4Chan <- packet
continue
}
}

if packet.TransportLayer().LayerType() == layers.LayerTypeTCP {
p := packet.TransportLayer().(*layers.TCP)
if int(p.SrcPort) != c.filterDnsPort && int(p.DstPort) != c.filterDnsPort {
// ipv6 fragmented packet ?
if packet.NetworkLayer().LayerType() == layers.LayerTypeIPv6 {
v6frag := packet.Layer(layers.LayerTypeIPv6Fragment)
if v6frag != nil {
fragIp6Chan <- packet
continue
}
assembler.AssembleWithTimestamp(
packet.NetworkLayer().NetworkFlow(),
packet.TransportLayer().(*layers.TCP),
packet.Metadata().Timestamp,
)
}

// tcp or udp packets ?
if packet.TransportLayer().LayerType() == layers.LayerTypeUDP {
udpChan <- packet
}
if packet.TransportLayer().LayerType() == layers.LayerTypeTCP {
tcpChan <- packet
}

}

// remove it ?
assembler.FlushAll()
c.LogInfo("processing of pcap file [%s] terminated", fileName)

// send empty packet to stop the goroutine
// and wait
reassembleChan <- DnsDataStruct{}
<-done
//assembler.FlushAll()
c.LogInfo("pcap file [%s] processing terminated, %d packet(s) read", fileName, nbPackets)

// remove it ?
if c.config.Collectors.FileIngestor.DeleteAfter {
Expand Down
Loading