Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new transform to detect duplicated traffic #282

Merged
merged 5 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Additionally, DNS-collector also support

**Transformers**:

- [`Traffic Reducer`](doc/transformers.md#traffic-reducer)
- Detect repetitive queries/replies and log once
- [`Extractor`](doc/transformers.md#extract)
- Add base64 encoded dns payload
- [`Latency Computing`](doc/transformers.md#latency-computing)
Expand Down
1 change: 1 addition & 0 deletions collectors/dnstap_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type DnstapProcessor struct {

func NewDnstapProcessor(config *dnsutils.Config, logger *logger.Logger, name string) DnstapProcessor {
logger.Info("[%s] dnstap processor - initialization...", name)

d := DnstapProcessor{
done: make(chan bool),
recvFrom: make(chan []byte, 512),
Expand Down
12 changes: 12 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,18 @@ multiplexer:
# list of transforms to apply on collectors or loggers
################################################

# # Use this transformer to add base64 dns payload in JSON ouput
# extract:
# # enable payload base64 encoding
# add-payload: true

# # Use this transformer to detect trafic duplication
# reducer:
# # enable detector
# repetitive-traffic-detector: true
# # watch interval in seconds
# watch-interval: 5

# # Use this transformer to compute latency and detect timeout on queries
# latency:
# # Measure latency between replies and queries
Expand Down
9 changes: 9 additions & 0 deletions dnsutils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type ConfigTransformers struct {
UnansweredQueries bool `yaml:"unanswered-queries"`
QueriesTimeout int `yaml:"queries-timeout"`
}
Reducer struct {
Enable bool `yaml:"enable"`
RepetitiveTrafficDetector bool `yaml:"repetitive-traffic-detector"`
WatchInterval int `yaml:"watch-interval"`
}
Filtering struct {
Enable bool `yaml:"enable"`
DropFqdnFile string `yaml:"drop-fqdn-file"`
Expand Down Expand Up @@ -120,6 +125,10 @@ func (c *ConfigTransformers) SetDefault() {
c.Latency.UnansweredQueries = false
c.Latency.QueriesTimeout = 2

c.Reducer.Enable = false
c.Reducer.RepetitiveTrafficDetector = false
c.Reducer.WatchInterval = 5

c.Filtering.Enable = false
c.Filtering.DropFqdnFile = ""
c.Filtering.DropDomainFile = ""
Expand Down
30 changes: 30 additions & 0 deletions dnsutils/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dnsutils

import (
"bytes"
"encoding/base64"
"encoding/binary"
"encoding/json"
"errors"
Expand All @@ -27,6 +28,7 @@ var (
GeoIPDirectives = regexp.MustCompile(`^geoip-*`)
SuspiciousDirectives = regexp.MustCompile(`^suspicious-*`)
PublicSuffixDirectives = regexp.MustCompile(`^publixsuffix-*`)
ExtractedDirectives = regexp.MustCompile(`^extracted-*`)
)

func GetIpPort(dm *DnsMessage) (string, int, string, int) {
Expand Down Expand Up @@ -108,6 +110,8 @@ type Dns struct {
Flags DnsFlags `json:"flags" msgpack:"flags"`
DnsRRs DnsRRs `json:"resource-records" msgpack:"resource-records"`
MalformedPacket bool `json:"malformed-packet" msgpack:"malformed-packet"`

Repeated int `json:"repeated" msgpack:"repeated"`
}

type DnsOption struct {
Expand Down Expand Up @@ -204,6 +208,7 @@ func (dm *DnsMessage) Init() {
Qtype: "-",
Qname: "-",
DnsRRs: DnsRRs{Answers: []DnsAnswer{}, Nameservers: []DnsAnswer{}, Records: []DnsAnswer{}},
Repeated: -1,
}

dm.EDNS = DnsExtended{
Expand Down Expand Up @@ -327,12 +332,30 @@ func (dm *DnsMessage) handlePublicSuffixDirectives(directives []string, s *bytes
}
}

func (dm *DnsMessage) handleExtractedDirectives(directives []string, s *bytes.Buffer) {
if dm.Extracted == nil {
s.WriteString("-")
} else {
switch directive := directives[0]; {
case directive == "extracted-dns-payload":
if len(dm.DNS.Payload) > 0 {
dst := make([]byte, base64.StdEncoding.EncodedLen(len(dm.DNS.Payload)))
base64.StdEncoding.Encode(dst, dm.DNS.Payload)
s.Write(dst)
} else {
s.WriteString("-")
}
}
}
}

func (dm *DnsMessage) Bytes(format []string, fieldDelimiter string, fieldBoundary string) []byte {
var s bytes.Buffer

for i, word := range format {
directives := strings.SplitN(word, ":", 2)
switch directive := directives[0]; {
// default directives
case directive == "ttl":
if len(dm.DNS.DnsRRs.Answers) > 0 {
s.WriteString(strconv.Itoa(dm.DNS.DnsRRs.Answers[0].Ttl))
Expand Down Expand Up @@ -456,14 +479,21 @@ func (dm *DnsMessage) Bytes(format []string, fieldDelimiter string, fieldBoundar
} else {
s.WriteString("-")
}
case directive == "repeated":
s.WriteString(strconv.Itoa(dm.DNS.Repeated))
// more directives from collectors
case PdnsDirectives.MatchString(directive):
dm.handlePdnsDirectives(directives, &s)
// more directives from transformers
case GeoIPDirectives.MatchString(directive):
dm.handleGeoIPDirectives(directives, &s)
case SuspiciousDirectives.MatchString(directive):
dm.handleSuspiciousDirectives(directives, &s)
case PublicSuffixDirectives.MatchString(directive):
dm.handlePublicSuffixDirectives(directives, &s)
case ExtractedDirectives.MatchString(directive):
dm.handleExtractedDirectives(directives, &s)
// error unsupport directive for text format
default:
log.Fatalf("unsupport directive for text format: %s", word)
}
Expand Down
2 changes: 1 addition & 1 deletion doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ You must defined the list of
- `routes`: routing definition

If you want apply some modifications in traffic, you can do that with [transformers](/doc/transformers.md).
It can also be done on collectors or loggers.
Transformers can be applied on collectors or loggers.

### Collectors

Expand Down
24 changes: 12 additions & 12 deletions doc/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,23 @@ go generate .

Add Configuration `dnsutils/config.go` and `config.yml`

```
```golang
type ConfigTransformers struct {
MyTransform struct {
Enable bool `yaml:"enable"`
}
}
```

```
```golang
func (c *ConfigTransformers) SetDefault() {
c.MyTransform.Enable = false
}
```

Create the following file `transformers/mytransform.go` and `loggers/mytransform_test.go`

```
```golang
type MyTransform struct {
config *dnsutils.ConfigTransformers
}
Expand All @@ -112,7 +112,7 @@ func NewMyTransform(config *dnsutils.ConfigTransformers) MyTransform {

Declare the transfomer in the following file `subprocessor.go`

```
```golang
func NewTransforms(
d := Transforms{
MyTransform: NewMyTransform(config, logger, name, outChannels),
Expand All @@ -126,7 +126,7 @@ Finally update the docs `doc/transformers.md` and `README.md`

1. Add Configuration `dnsutils/config.go` and `config.yml`

```
```golang
Loggers struct {
MyLogger struct {
Enable bool `yaml:"enable"`
Expand All @@ -135,15 +135,15 @@ Loggers struct {

```

```
```golang
func (c *Config) SetDefault() {
c.Loggers.MyLogger.Enable = false
}
```

2. Create the following file `loggers/mylogger.go` and `loggers/mylogger_test.go`

```
```golang
package loggers

import (
Expand Down Expand Up @@ -218,7 +218,7 @@ func (o *MyLogger) Run() {

3. Update the main file `dnscollector.go`

```
```golang
if subcfg.Loggers.MyLogger.Enable && IsLoggerRouted(config, output.Name) {
mapLoggers[output.Name] = loggers.NewMyLogger(subcfg, logger, output.Name)
}
Expand All @@ -230,23 +230,23 @@ if subcfg.Loggers.MyLogger.Enable && IsLoggerRouted(config, output.Name) {

Add Configuration `dnsutils/config.go` and `config.yml`

```
```golang
Collectors struct {
MyCollector struct {
Enable bool `yaml:"enable"`
} `yaml:"tail"`
}
```

```
```golang
func (c *Config) SetDefault() {
c.Collectors.MyCollector.Enable = false
}
```

Create the following file `collectors/mycollector.go` and `collectors/mycollector_test.go`

```
```golang
package collectors

import (
Expand Down Expand Up @@ -327,7 +327,7 @@ func (c *MyCollector) Run() {

Update the main file `dnscollector.go`

```
```golang
if subcfg.Collectors.MyCollector.Enable && IsCollectorRouted(config, input.Name) {
mapCollectors[input.Name] = collectors.NewMyCollector(nil, subcfg, logger, input.Name)
}
Expand Down
42 changes: 34 additions & 8 deletions doc/transformers.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
- [Traffic filtering](#traffic-filtering)
- [Suspicious](#suspicious)
- [Latency Computing](#latency-computing)
- [Extract](#extract)
- [Extractor](#extractor)
- [Traffic reducer](#traffic-reducer)

## Transformers

Expand Down Expand Up @@ -136,7 +137,7 @@ Specific directives added:
- `geoip-as-number`: autonomous system number
- `geoip-as-owner`: autonomous system organization/owner

### Traffic filtering
### Traffic Filtering

The filtering feature can be used to ignore some queries or replies according to:
- qname
Expand Down Expand Up @@ -207,6 +208,9 @@ transforms:
threshold-max-labels: 10
```

Specific directive(s) available for the text format:
- `suspicious-score`: suspicious score for unusual traffic

When the feature is enabled, the following json field are populated in your DNS message:

Example:
Expand All @@ -224,9 +228,6 @@ Example:
}
```

Specific directive(s) added:
- `suspicious-score`: suspicious score for unusual traffic

### Latency Computing


Expand Down Expand Up @@ -263,21 +264,46 @@ Example of DNS messages in text format
2023-04-11T18:42:50.939138364Z dnsdist1 CLIENT_QUERY TIMEOUT 127.0.0.1 52376 IPv4 UDP 54b www.google.fr A -
```

### Extract
### Traffic Reducer

Use this transformer to detect repetitive traffic

Options:
- `repetitive-traffic-detector`: (boolean) detect repetitive traffic
- `watch-interval`: (integer) watch interval in seconds

Default values:

```yaml
transforms:
reducer:
repetitive-traffic-detector: true
watch-interval: 5
```

Specific directive(s) available for the text format:
- `repeated`: display the number of detected duplication

### Extractor

Use this transformer to extract the raw dns payload encoded in base64:

Options:
- `add-payload`: (boolean) add base64 encoded dns payload

Default values:

```yaml
transforms:
extract:
add-payload: true
add-payload: false
```

Specific directive(s) available for the text format:
- `extracted-dns-payload`: add the base64 encoded of the dns message

When the feature is enabled, an "extracted" field appears in the DNS message and is populated with a "dns_payload" field:

```
```json
{"network":{"family":"IPv4","protocol":"UDP","query-ip":"10.1.0.123","query-port":"56357","response-ip":"10.7.0.252","response-port":"53","ip-defragmented":false,"tcp-reassembled":false},"dns":{"length":63,"opcode":0,"rcode":"NOERROR","qname":"orange-sanguine.fr","qtype":"A","flags":{"qr":true,"tc":false,"aa":false,"ra":true,"ad":false},"resource-records":{"an":[{"name":"orange-sanguine.fr","rdatatype":"A","ttl":21600,"rdata":"193.203.239.81"}],"ns":[],"ar":[]},"malformed-packet":false},"edns":{"udp-size":1232,"rcode":0,"version":0,"dnssec-ok":0,"options":[]},"dnstap":{"operation":"CLIENT_RESPONSE","identity":"dns-collector","version":"-","timestamp-rfc3339ns":"2023-04-19T11:23:56.018192608Z","latency":"0.000000"},"extracted":{"dns_payload":"P6CBgAABAAEAAAABD29yYW5nZS1zYW5ndWluZQJmcgAAAQABwAwAAQABAABUYAAEwcvvUQAAKQTQAAAAAAAA"}}
```
2 changes: 0 additions & 2 deletions loggers/fluentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package loggers

import (
"crypto/tls"
"fmt"
"net"
"strconv"
"time"
Expand Down Expand Up @@ -220,7 +219,6 @@ LOOP:
// flush the buffer
case <-flushTimer.C:
if !o.writerReady {
fmt.Println("buffer cleared!")
bufferDm = nil
continue
}
Expand Down
8 changes: 3 additions & 5 deletions loggers/redispub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package loggers

import (
"bufio"
"fmt"
"net"
"regexp"
"testing"
Expand Down Expand Up @@ -69,19 +68,18 @@ func Test_RedisPubRun(t *testing.T) {
reader := bufio.NewReader(conn)
line, err := reader.ReadString('\n')
if err != nil {
fmt.Println("Erreur de lecture de la réponse :", err)
t.Error(err)
return
}
fmt.Println(line)

pattern := regexp.MustCompile(tc.pattern)
if !pattern.MatchString(line) {
t.Errorf("syslog error want %s, got: %s", tc.pattern, line)
t.Errorf("redis error want %s, got: %s", tc.pattern, line)
}

pattern2 := regexp.MustCompile("PUBLISH \"testons\"")
if !pattern2.MatchString(line) {
t.Errorf("syslog error want %s, got: %s", pattern2, line)
t.Errorf("redis error want %s, got: %s", pattern2, line)
}
})
}
Expand Down
Loading