Skip to content

Commit

Permalink
feat: new kafka logger (#291)
Browse files Browse the repository at this point in the history
* first  shot for #177

* Update docs

* add testunit
  • Loading branch information
dmachard authored May 5, 2023
1 parent 721869c commit e7dab11
Show file tree
Hide file tree
Showing 12 changed files with 627 additions and 42 deletions.
3 changes: 2 additions & 1 deletion .vscode/c_cpp_properties.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"compilerPath": "/usr/bin/clang",
"cStandard": "c17",
"cppStandard": "c++14",
"intelliSenseMode": "linux-clang-x64"
"intelliSenseMode": "linux-clang-x64",
"configurationProvider": "ms-vscode.makefile-tools"
}
],
"version": 4
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Additionally, DNS-collector also support
- [`ElasticSearch`](doc/loggers.md#elasticsearch-client)
- [`Scalyr`](doc/loggers.md#scalyr-client)
- [`Redis`](doc/loggers.md#redispub)
- [`Kafka`](doc/loggers.md#kafkaproducer)
- *Send to security tools*
- [`Falco`](doc/loggers.md#falco)

Expand Down
86 changes: 61 additions & 25 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ multiplexer:
# stdout:
# # output format: text|json|flat-json
# mode: text
# # output text format, please refer at the end if this config to see all available directives
# # output text format, please refer to the top of this file to see all available directives
# text-format: "timestamp-rfc3339ns identity operation rcode queryip queryport family protocol length qname qtype latency"

# # rest api server
Expand Down Expand Up @@ -268,7 +268,7 @@ multiplexer:
# compress-postcommand: null
# # output format: text|json|pcap|dnstap|flat-json
# mode: text
# # output text format, please refer at the end if this config to see all available directives
# # output text format, please refer to the top of this file to see all available directives
# text-format: "timestamp-rfc3339ns identity operation rcode queryip queryport family protocol length qname qtype latency"
# # run external script after each file rotation
# postrotate-command: null
Expand Down Expand Up @@ -322,33 +322,36 @@ multiplexer:
# tls-insecure: false
# # output format: text|json|flat-json
# mode: json
# # output text format, please refer at the end if this config to see all available directives
# # output text format, please refer to the top of this file to see all available directives
# text-format: "timestamp-rfc3339ns identity operation rcode queryip queryport family protocol length qname qtype latency"
# # delimiter to use between payload sent
# delimiter: "\n"
# # number of dns messages in buffer
# # how many DNS messages will be buffered before being sent
# buffer-size: 100


# # Send captured traffic to a redis channel, mapped on TCP client logger options
# loggers:
# - name: redis
# redispub:
# mode: json
# transport: tcp
# remote-address: 127.0.0.1
# remote-port: 6379
# sock-path: null
# connect-timeout: 5
# retry-interval: 10
# flush-interval: 2
# tls-support: false
# tls-insecure: false
# text-format: "timestamp-rfc3339ns identity operation rcode queryip queryport family protocol length qname qtype latency"
# delimiter: "\n"
# buffer-size: 100
# # Name of the channel to publish into
# redis-channel: dns-collector
# redispub:
# # output format: text|json|flat-json
# mode: json
# # remote address
# remote-address: 127.0.0.1
# # remote tcp port
# remote-port: 6379
# # connect timeout
# connect-timeout: 5
# retry-interval: 10
# flush-interval: 2
# # enable tls
# tls-support: false
# tls-insecure: false
# # output text format, please refer to the top of this file to see all available directives
# text-format: "timestamp-rfc3339ns identity operation rcode queryip queryport family protocol length qname qtype latency"
# delimiter: "\n"
# # how many DNS messages will be buffered before being sent
# buffer-size: 100
# # Name of the channel to publish into
# redis-channel: dns-collector

# # redirect captured dns traffic to a remote syslog server or local one
# syslog:
Expand All @@ -361,7 +364,7 @@ multiplexer:
# transport: local
# # Remote address host:port
# remote-address: ""
# # output text format, please refer at the end if this config to see all available directives
# # output text format, please refer to the top of this file to see all available directives
# text-format: "timestamp-rfc3339ns identity operation rcode queryip queryport family protocol length qname qtype latency"
# # output format: text|json|flat-json
# mode: text
Expand Down Expand Up @@ -399,7 +402,7 @@ multiplexer:
# tls-support: false
# # insecure skip verify
# tls-insecure: false
# # number of dns messages in buffer
# # how many DNS messages will be buffered before being sent
# buffer-size: 100

# # resend captured dns traffic to a InfluxDB database
Expand Down Expand Up @@ -474,7 +477,7 @@ multiplexer:
# scalyrclient:
# # output format: text|json|flat-json
# mode: text
# # output text format, please refer at the end if this config to see all available directives
# # output text format, please refer to the top of this file to see all available directives
# text-format: "timestamp-rfc3339ns identity operation rcode queryip queryport family protocol length qname qtype latency"
# # Any "session" information for the Scalyr backend. By default, "serverHost" is set to the hostname of the machine
# sessioninfo: {}
Expand All @@ -495,6 +498,39 @@ multiplexer:
# # tls min version
# tls-min-version: 1.2

# # resend captured dns traffic to a kafka sink
# kafkaproducer:
# # remote address
# remote-address: 127.0.0.1
# # remote tcp port
# remote-port: 9092
# # connect timeout
# connect-timeout: 5
# # interval in second between retry reconnect
# retry-interval: 10
# # interval in second before to flush the buffer
# flush-interval: 30
# # enable tls
# tls-support: false
# # insecure skip verify
# tls-insecure: false
# # enable SASL
# sasl-support: false
# # SASL mechanism: PLAIN|SCRAM-SHA-512
# sasl-mechanism: PLAIN
# # SASL username
# sasl-username: false
# # SASL password
# sasl-password: false
# # output format: text|json|flat-json
# mode: flat-json
# # how many DNS messages will be buffered before being sent
# buffer-size: 100
# # Kafka topic to forward messages to
# topic: "dnscollector"
# # Kafka partition
# partition: 0

# # Send captured traffic to falco (https://falco.org/), for security and advanced inspection
# falco:
# # remote falco plugin endpoint
Expand Down
3 changes: 3 additions & 0 deletions dnscollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ func main() {
if subcfg.Loggers.RedisPub.Enable && IsLoggerRouted(config, output.Name) {
mapLoggers[output.Name] = loggers.NewRedisPub(subcfg, logger, output.Name)
}
if subcfg.Loggers.KafkaProducer.Enable && IsLoggerRouted(config, output.Name) {
mapLoggers[output.Name] = loggers.NewKafkaProducer(subcfg, logger, output.Name)
}
if subcfg.Loggers.FalcoClient.Enable && IsLoggerRouted(config, output.Name) {
mapLoggers[output.Name] = loggers.NewFalcoClient(subcfg, logger, output.Name)
}
Expand Down
37 changes: 37 additions & 0 deletions dnsutils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,25 @@ type Config struct {
ConnectTimeout int `yaml:"connect-timeout"`
RedisChannel string `yaml:"redis-channel"`
} `yaml:"redispub"`
KafkaProducer struct {
Enable bool `yaml:"enable"`
RemoteAddress string `yaml:"remote-address"`
RemotePort int `yaml:"remote-port"`
RetryInterval int `yaml:"retry-interval"`
TlsSupport bool `yaml:"tls-support"`
TlsInsecure bool `yaml:"tls-insecure"`
TlsMinVersion string `yaml:"tls-min-version"`
SaslSupport bool `yaml:"sasl-support"`
SaslUsername string `yaml:"sasl-username"`
SaslPassword string `yaml:"sasl-password"`
SaslMechanism string `yaml:"sasl-mechanism"`
Mode string `yaml:"mode"`
BufferSize int `yaml:"buffer-size"`
FlushInterval int `yaml:"flush-interval"`
ConnectTimeout int `yaml:"connect-timeout"`
Topic string `yaml:"topic"`
Partition int `yaml:"partition"`
} `yaml:"kafkaproducer"`
FalcoClient struct {
Enable bool `yaml:"enable"`
URL string `yaml:"url"`
Expand Down Expand Up @@ -650,6 +669,24 @@ func (c *Config) SetDefault() {
c.Loggers.RedisPub.FlushInterval = 30
c.Loggers.RedisPub.RedisChannel = "dns_collector"

c.Loggers.KafkaProducer.Enable = false
c.Loggers.KafkaProducer.RemoteAddress = LOCALHOST_IP
c.Loggers.KafkaProducer.RemotePort = 9092
c.Loggers.KafkaProducer.RetryInterval = 10
c.Loggers.KafkaProducer.TlsSupport = false
c.Loggers.KafkaProducer.TlsInsecure = false
c.Loggers.KafkaProducer.TlsMinVersion = TLS_v12
c.Loggers.KafkaProducer.SaslSupport = false
c.Loggers.KafkaProducer.SaslUsername = ""
c.Loggers.KafkaProducer.SaslPassword = ""
c.Loggers.KafkaProducer.SaslMechanism = SASL_MECHANISM_PLAIN
c.Loggers.KafkaProducer.Mode = MODE_FLATJSON
c.Loggers.KafkaProducer.BufferSize = 100
c.Loggers.KafkaProducer.ConnectTimeout = 5
c.Loggers.KafkaProducer.FlushInterval = 10
c.Loggers.KafkaProducer.Topic = "dnscollector"
c.Loggers.KafkaProducer.Partition = 0

c.Loggers.FalcoClient.Enable = false
c.Loggers.FalcoClient.URL = "http://127.0.0.1:9200"

Expand Down
3 changes: 3 additions & 0 deletions dnsutils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
MODE_PCAP = "pcap"
MODE_DNSTAP = "dnstap"

SASL_MECHANISM_PLAIN = "PLAIN"
SASL_MECHANISM_SCRAM = "SCRAM-SHA-512"

DNS_RCODE_NXDOMAIN = "NXDOMAIN"
DNS_RCODE_SERVFAIL = "SERVFAIL"
DNS_RCODE_TIMEOUT = "TIMEOUT"
Expand Down
64 changes: 54 additions & 10 deletions doc/loggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- [ElasticSearch](#elasticsearch-client)
- [Scalyr](#scalyr-client)
- [Redispub](#redispub)
- [Kafka](#kafkaproducer)
- [Falco](#falco)

## Loggers
Expand All @@ -25,7 +26,7 @@ Print to your standard output, all DNS logs received
* custom text format

Options:
- `mode`: (string) text or json
- `mode`: (string) output format: text, json, or flat-json
- `text-format`: (string) output text format, please refer to the default text format to see all available directives, use this parameter if you want a specific format

Default values:
Expand Down Expand Up @@ -143,7 +144,7 @@ Options:
- `compress`: (boolean) compress log file
- `compress-interval`: (integer) checking every X seconds if new log files must be compressed
- `compress-command`: (string) run external script after file compress step
- `mode`: (string) output format: text|json|pcap|dnstap|flat-json
- `mode`: (string) output format: text, json, flat-json, pcap or dnstap
- `text-format`: (string) output text format, please refer to the default text format to see all available directives, use this parameter if you want a specific format
- `postrotate-command`: (string) run external script after file rotation
- `postrotate-delete-success`: (boolean) delete file on script success
Expand Down Expand Up @@ -248,16 +249,16 @@ Tcp/unix stream client logger.

Options:
- `transport`: (string) network transport to use: tcp|unix
- `listen-ip`: (string) remote address
- `listen-port`: (integer) remote tcp port
- `remote-ip`: (string) remote address
- `remote-port`: (integer) remote tcp port
- `sock-path`: (string) unix socket path
- `connect-timeout`: (integer) connect timeout in second
- `retry-interval`: (integer) interval in second between retry reconnect
- `flush-interval`: (integer) interval in second before to flush the buffer
- `tls-support`: (boolean) enable tls
- `tls-insecure`: (boolean) insecure skip verify
- `tls-min-version`: (string) min tls version, default to 1.2
- `mode`: (string) output format: text|json
- `mode`: (string) output format: text, json, or flat-json
- `text-format`: (string) output text format, please refer to the default text format to see all available directives, use this parameter if you want a specific format
- `buffer-size`: (integer) number of dns messages in buffer

Expand Down Expand Up @@ -292,7 +293,7 @@ Options:
- `facility`: (string) Set the syslog logging facility
- `transport`: (string) Transport to use to a remote log daemon or local one. local|tcp|udp|unix
- `remote-address`: (string) Remote address host:port
- `mode`: (string) text, json or flat-json
- `mode`: (string) output format: text, json, or flat-json
- `text-format`: (string) output text format, please refer to the default text format to see all available directives, use this parameter if you want a specific format
- `tls-support`: (boolean) enable tls
- `tls-insecure`: (boolean) insecure skip verify
Expand Down Expand Up @@ -387,7 +388,7 @@ Loki client to remote server
Options:
- `server-url`: (string) Loki server url
- `job-name`: (string) Job name
- `mode`: (string) text, json or flat json
- `mode`: (string) output format: text, json, or flat-json
- `flush-interval`: (integer) flush batch every X seconds
- `batch-size`: (integer) batch size for log entries in bytes
- `retry-interval`: (integer) interval in second between before to retry to send batch
Expand Down Expand Up @@ -541,7 +542,7 @@ Options:
- `tls-support`: (boolean) enable tls
- `tls-insecure`: (boolean) insecure skip verify
- `tls-min-version`: (string) min tls version, default to 1.2
- `mode`: (string) output format: text|json
- `mode`: (string) output format: text, json, or flat-json
- `text-format`: (string) output text format, please refer to the default text format to see all available directives, use this parameter if you want a specific format
- `buffer-size`: (integer) number of dns messages in buffer
- `redis-channel`: (string) name of the redis pubsub channel to publish into
Expand All @@ -566,6 +567,49 @@ redispub:
redis-channel: dns-collector
```

### Kafka Producer

Kafka producer

Options:
- `remote-ip`: (string) remote address
- `remote-port`: (integer) remote tcp port
- `connect-timeout`: (integer) connect timeout in second
- `retry-interval`: (integer) interval in second between retry reconnect
- `flush-interval`: (integer) interval in second before to flush the buffer
- `tls-support`: (boolean) enable tls
- `tls-insecure`: (boolean) insecure skip verify
- `tls-min-version`: (string) min tls version, default to 1.2
- `sasl-support`: (boolean) enable SASL
- `sasl-username`: (string) SASL username
- `sasl-password`: (string) SASL password
- `sasl-mechanism`: (string) SASL mechanism: PLAIN or SCRAM-SHA-512
- `mode`: (string) output format: text, json, or flat-json
- `buffer-size`: (integer) how many DNS messages will be buffered before being sent
- `topic`: (integer) kafka topic to forward messages to
- `partition`: (integer) kafka partition

Default values:

```yaml
kafkaproducer:
remote-address: 127.0.0.1
remote-port: 9092
connect-timeout: 5
retry-interval: 10
flush-interval: 30
tls-support: false
tls-insecure: false
tls-min-version: 1.2
sasl-support: false
sasl-mechanism: PLAIN
sasl-username: ""
sasl-password: ""
mode: flat-json
buffer-size: 100
topic: "dnscollector"
partition: 0
### Falco
Falco plugin Logger - Currently available here https://github.com/SysdigDan/dnscollector-falco-plugin
Expand All @@ -575,7 +619,7 @@ Options:

Default values:

```yaml
```
falco:
url: "http://127.0.0.1:9200/events"
```
```
Loading

0 comments on commit e7dab11

Please sign in to comment.