Skip to content

Commit

Permalink
feat: new transform to detect duplicated traffic (#282)
Browse files Browse the repository at this point in the history
* feat duplicated traffic
* update docs
  • Loading branch information
dmachard authored Apr 21, 2023
1 parent b3eba9b commit da88670
Show file tree
Hide file tree
Showing 16 changed files with 424 additions and 115 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Additionally, DNS-collector also support

**Transformers**:

- [`Traffic Reducer`](doc/transformers.md#traffic-reducer)
- Detect repetitive queries/replies and log once
- [`Extractor`](doc/transformers.md#extract)
- Add base64 encoded dns payload
- [`Latency Computing`](doc/transformers.md#latency-computing)
Expand Down
1 change: 1 addition & 0 deletions collectors/dnstap_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type DnstapProcessor struct {

func NewDnstapProcessor(config *dnsutils.Config, logger *logger.Logger, name string) DnstapProcessor {
logger.Info("[%s] dnstap processor - initialization...", name)

d := DnstapProcessor{
done: make(chan bool),
recvFrom: make(chan []byte, 512),
Expand Down
12 changes: 12 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,18 @@ multiplexer:
# list of transforms to apply on collectors or loggers
################################################

# # Use this transformer to add base64 dns payload in JSON ouput
# extract:
# # enable payload base64 encoding
# add-payload: true

# # Use this transformer to detect trafic duplication
# reducer:
# # enable detector
# repetitive-traffic-detector: true
# # watch interval in seconds
# watch-interval: 5

# # Use this transformer to compute latency and detect timeout on queries
# latency:
# # Measure latency between replies and queries
Expand Down
9 changes: 9 additions & 0 deletions dnsutils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type ConfigTransformers struct {
UnansweredQueries bool `yaml:"unanswered-queries"`
QueriesTimeout int `yaml:"queries-timeout"`
}
Reducer struct {
Enable bool `yaml:"enable"`
RepetitiveTrafficDetector bool `yaml:"repetitive-traffic-detector"`
WatchInterval int `yaml:"watch-interval"`
}
Filtering struct {
Enable bool `yaml:"enable"`
DropFqdnFile string `yaml:"drop-fqdn-file"`
Expand Down Expand Up @@ -120,6 +125,10 @@ func (c *ConfigTransformers) SetDefault() {
c.Latency.UnansweredQueries = false
c.Latency.QueriesTimeout = 2

c.Reducer.Enable = false
c.Reducer.RepetitiveTrafficDetector = false
c.Reducer.WatchInterval = 5

c.Filtering.Enable = false
c.Filtering.DropFqdnFile = ""
c.Filtering.DropDomainFile = ""
Expand Down
30 changes: 30 additions & 0 deletions dnsutils/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dnsutils

import (
"bytes"
"encoding/base64"
"encoding/binary"
"encoding/json"
"errors"
Expand All @@ -27,6 +28,7 @@ var (
GeoIPDirectives = regexp.MustCompile(`^geoip-*`)
SuspiciousDirectives = regexp.MustCompile(`^suspicious-*`)
PublicSuffixDirectives = regexp.MustCompile(`^publixsuffix-*`)
ExtractedDirectives = regexp.MustCompile(`^extracted-*`)
)

func GetIpPort(dm *DnsMessage) (string, int, string, int) {
Expand Down Expand Up @@ -108,6 +110,8 @@ type Dns struct {
Flags DnsFlags `json:"flags" msgpack:"flags"`
DnsRRs DnsRRs `json:"resource-records" msgpack:"resource-records"`
MalformedPacket bool `json:"malformed-packet" msgpack:"malformed-packet"`

Repeated int `json:"repeated" msgpack:"repeated"`
}

type DnsOption struct {
Expand Down Expand Up @@ -204,6 +208,7 @@ func (dm *DnsMessage) Init() {
Qtype: "-",
Qname: "-",
DnsRRs: DnsRRs{Answers: []DnsAnswer{}, Nameservers: []DnsAnswer{}, Records: []DnsAnswer{}},
Repeated: -1,
}

dm.EDNS = DnsExtended{
Expand Down Expand Up @@ -327,12 +332,30 @@ func (dm *DnsMessage) handlePublicSuffixDirectives(directives []string, s *bytes
}
}

func (dm *DnsMessage) handleExtractedDirectives(directives []string, s *bytes.Buffer) {
if dm.Extracted == nil {
s.WriteString("-")
} else {
switch directive := directives[0]; {
case directive == "extracted-dns-payload":
if len(dm.DNS.Payload) > 0 {
dst := make([]byte, base64.StdEncoding.EncodedLen(len(dm.DNS.Payload)))
base64.StdEncoding.Encode(dst, dm.DNS.Payload)
s.Write(dst)
} else {
s.WriteString("-")
}
}
}
}

func (dm *DnsMessage) Bytes(format []string, fieldDelimiter string, fieldBoundary string) []byte {
var s bytes.Buffer

for i, word := range format {
directives := strings.SplitN(word, ":", 2)
switch directive := directives[0]; {
// default directives
case directive == "ttl":
if len(dm.DNS.DnsRRs.Answers) > 0 {
s.WriteString(strconv.Itoa(dm.DNS.DnsRRs.Answers[0].Ttl))
Expand Down Expand Up @@ -456,14 +479,21 @@ func (dm *DnsMessage) Bytes(format []string, fieldDelimiter string, fieldBoundar
} else {
s.WriteString("-")
}
case directive == "repeated":
s.WriteString(strconv.Itoa(dm.DNS.Repeated))
// more directives from collectors
case PdnsDirectives.MatchString(directive):
dm.handlePdnsDirectives(directives, &s)
// more directives from transformers
case GeoIPDirectives.MatchString(directive):
dm.handleGeoIPDirectives(directives, &s)
case SuspiciousDirectives.MatchString(directive):
dm.handleSuspiciousDirectives(directives, &s)
case PublicSuffixDirectives.MatchString(directive):
dm.handlePublicSuffixDirectives(directives, &s)
case ExtractedDirectives.MatchString(directive):
dm.handleExtractedDirectives(directives, &s)
// error unsupport directive for text format
default:
log.Fatalf("unsupport directive for text format: %s", word)
}
Expand Down
2 changes: 1 addition & 1 deletion doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ You must defined the list of
- `routes`: routing definition

If you want apply some modifications in traffic, you can do that with [transformers](/doc/transformers.md).
It can also be done on collectors or loggers.
Transformers can be applied on collectors or loggers.

### Collectors

Expand Down
24 changes: 12 additions & 12 deletions doc/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,23 @@ go generate .

Add Configuration `dnsutils/config.go` and `config.yml`

```
```golang
type ConfigTransformers struct {
MyTransform struct {
Enable bool `yaml:"enable"`
}
}
```

```
```golang
func (c *ConfigTransformers) SetDefault() {
c.MyTransform.Enable = false
}
```

Create the following file `transformers/mytransform.go` and `loggers/mytransform_test.go`

```
```golang
type MyTransform struct {
config *dnsutils.ConfigTransformers
}
Expand All @@ -112,7 +112,7 @@ func NewMyTransform(config *dnsutils.ConfigTransformers) MyTransform {

Declare the transfomer in the following file `subprocessor.go`

```
```golang
func NewTransforms(
d := Transforms{
MyTransform: NewMyTransform(config, logger, name, outChannels),
Expand All @@ -126,7 +126,7 @@ Finally update the docs `doc/transformers.md` and `README.md`

1. Add Configuration `dnsutils/config.go` and `config.yml`

```
```golang
Loggers struct {
MyLogger struct {
Enable bool `yaml:"enable"`
Expand All @@ -135,15 +135,15 @@ Loggers struct {

```

```
```golang
func (c *Config) SetDefault() {
c.Loggers.MyLogger.Enable = false
}
```

2. Create the following file `loggers/mylogger.go` and `loggers/mylogger_test.go`

```
```golang
package loggers

import (
Expand Down Expand Up @@ -218,7 +218,7 @@ func (o *MyLogger) Run() {

3. Update the main file `dnscollector.go`

```
```golang
if subcfg.Loggers.MyLogger.Enable && IsLoggerRouted(config, output.Name) {
mapLoggers[output.Name] = loggers.NewMyLogger(subcfg, logger, output.Name)
}
Expand All @@ -230,23 +230,23 @@ if subcfg.Loggers.MyLogger.Enable && IsLoggerRouted(config, output.Name) {

Add Configuration `dnsutils/config.go` and `config.yml`

```
```golang
Collectors struct {
MyCollector struct {
Enable bool `yaml:"enable"`
} `yaml:"tail"`
}
```

```
```golang
func (c *Config) SetDefault() {
c.Collectors.MyCollector.Enable = false
}
```

Create the following file `collectors/mycollector.go` and `collectors/mycollector_test.go`

```
```golang
package collectors

import (
Expand Down Expand Up @@ -327,7 +327,7 @@ func (c *MyCollector) Run() {

Update the main file `dnscollector.go`

```
```golang
if subcfg.Collectors.MyCollector.Enable && IsCollectorRouted(config, input.Name) {
mapCollectors[input.Name] = collectors.NewMyCollector(nil, subcfg, logger, input.Name)
}
Expand Down
42 changes: 34 additions & 8 deletions doc/transformers.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
- [Traffic filtering](#traffic-filtering)
- [Suspicious](#suspicious)
- [Latency Computing](#latency-computing)
- [Extract](#extract)
- [Extractor](#extractor)
- [Traffic reducer](#traffic-reducer)

## Transformers

Expand Down Expand Up @@ -136,7 +137,7 @@ Specific directives added:
- `geoip-as-number`: autonomous system number
- `geoip-as-owner`: autonomous system organization/owner

### Traffic filtering
### Traffic Filtering

The filtering feature can be used to ignore some queries or replies according to:
- qname
Expand Down Expand Up @@ -207,6 +208,9 @@ transforms:
threshold-max-labels: 10
```

Specific directive(s) available for the text format:
- `suspicious-score`: suspicious score for unusual traffic

When the feature is enabled, the following json field are populated in your DNS message:

Example:
Expand All @@ -224,9 +228,6 @@ Example:
}
```

Specific directive(s) added:
- `suspicious-score`: suspicious score for unusual traffic

### Latency Computing


Expand Down Expand Up @@ -263,21 +264,46 @@ Example of DNS messages in text format
2023-04-11T18:42:50.939138364Z dnsdist1 CLIENT_QUERY TIMEOUT 127.0.0.1 52376 IPv4 UDP 54b www.google.fr A -
```
### Extract
### Traffic Reducer
Use this transformer to detect repetitive traffic
Options:
- `repetitive-traffic-detector`: (boolean) detect repetitive traffic
- `watch-interval`: (integer) watch interval in seconds
Default values:
```yaml
transforms:
reducer:
repetitive-traffic-detector: true
watch-interval: 5
```

Specific directive(s) available for the text format:
- `repeated`: display the number of detected duplication

### Extractor

Use this transformer to extract the raw dns payload encoded in base64:

Options:
- `add-payload`: (boolean) add base64 encoded dns payload

Default values:

```yaml
transforms:
extract:
add-payload: true
add-payload: false
```
Specific directive(s) available for the text format:
- `extracted-dns-payload`: add the base64 encoded of the dns message

When the feature is enabled, an "extracted" field appears in the DNS message and is populated with a "dns_payload" field:

```
```json
{"network":{"family":"IPv4","protocol":"UDP","query-ip":"10.1.0.123","query-port":"56357","response-ip":"10.7.0.252","response-port":"53","ip-defragmented":false,"tcp-reassembled":false},"dns":{"length":63,"opcode":0,"rcode":"NOERROR","qname":"orange-sanguine.fr","qtype":"A","flags":{"qr":true,"tc":false,"aa":false,"ra":true,"ad":false},"resource-records":{"an":[{"name":"orange-sanguine.fr","rdatatype":"A","ttl":21600,"rdata":"193.203.239.81"}],"ns":[],"ar":[]},"malformed-packet":false},"edns":{"udp-size":1232,"rcode":0,"version":0,"dnssec-ok":0,"options":[]},"dnstap":{"operation":"CLIENT_RESPONSE","identity":"dns-collector","version":"-","timestamp-rfc3339ns":"2023-04-19T11:23:56.018192608Z","latency":"0.000000"},"extracted":{"dns_payload":"P6CBgAABAAEAAAABD29yYW5nZS1zYW5ndWluZQJmcgAAAQABwAwAAQABAABUYAAEwcvvUQAAKQTQAAAAAAAA"}}
```
2 changes: 0 additions & 2 deletions loggers/fluentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package loggers

import (
"crypto/tls"
"fmt"
"net"
"strconv"
"time"
Expand Down Expand Up @@ -220,7 +219,6 @@ LOOP:
// flush the buffer
case <-flushTimer.C:
if !o.writerReady {
fmt.Println("buffer cleared!")
bufferDm = nil
continue
}
Expand Down
8 changes: 3 additions & 5 deletions loggers/redispub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package loggers

import (
"bufio"
"fmt"
"net"
"regexp"
"testing"
Expand Down Expand Up @@ -69,19 +68,18 @@ func Test_RedisPubRun(t *testing.T) {
reader := bufio.NewReader(conn)
line, err := reader.ReadString('\n')
if err != nil {
fmt.Println("Erreur de lecture de la réponse :", err)
t.Error(err)
return
}
fmt.Println(line)

pattern := regexp.MustCompile(tc.pattern)
if !pattern.MatchString(line) {
t.Errorf("syslog error want %s, got: %s", tc.pattern, line)
t.Errorf("redis error want %s, got: %s", tc.pattern, line)
}

pattern2 := regexp.MustCompile("PUBLISH \"testons\"")
if !pattern2.MatchString(line) {
t.Errorf("syslog error want %s, got: %s", pattern2, line)
t.Errorf("redis error want %s, got: %s", pattern2, line)
}
})
}
Expand Down
Loading

0 comments on commit da88670

Please sign in to comment.