Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally disable tag parsing #325

Merged
merged 5 commits into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ metric.name[tagName=val,tag2Name=val2]:0|c
Be aware: If you mix tag styles (e.g., Librato/InfluxDB with DogStatsD), the exporter will consider this an error and the behavior is undefined.
Also, tags without values (`#some_tag`) are not supported and will be ignored.

The exporter parses all tagging formats by default, but individual tagging formats can be disabled with command line flags:
```
--no-statsd.parse-dogstatsd-tags
--no-statsd.parse-influxdb-tags
--no-statsd.parse-librato-tags
--no-statsd.parse-signalfx-tags
```

## Building and Running

NOTE: Version 0.7.0 switched to the [kingpin](https://github.com/alecthomas/kingpin) flags library. With this change, flag behaviour is POSIX-ish:
Expand Down Expand Up @@ -131,6 +139,14 @@ NOTE: Version 0.7.0 switched to the [kingpin](https://github.com/alecthomas/king
--debug.dump-fsm="" The path to dump internal FSM generated for glob
matching as Dot file.
--check-config Check configuration and exit.
--statsd.parse-dogstatsd-tags
Parse DogStatsd style tags. Enabled by default.
--statsd.parse-influxdb-tags
Parse InfluxDB style tags. Enabled by default.
--statsd.parse-librato-tags
Parse Librato style tags. Enabled by default.
--statsd.parse-signalfx-tags
Parse SignalFX style tags. Enabled by default.
--log.level=info Only log messages with the given severity or
above. One of: [debug, info, warn, error]
--log.format=logfmt Output format of log messages. One of: [logfmt,
Expand Down
9 changes: 9 additions & 0 deletions bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line"
"github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)
Expand Down Expand Up @@ -530,10 +531,17 @@ func TestHandlePacket(t *testing.T) {
},
}

parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()

for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LineParser: parser,
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Expand All @@ -545,6 +553,7 @@ func TestHandlePacket(t *testing.T) {
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LineParser: parser,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
Expand Down
8 changes: 8 additions & 0 deletions exporter_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line"
"github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)
Expand All @@ -47,6 +48,12 @@ func benchmarkUDPListener(times int, b *testing.B) {
}
}

parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()

// reset benchmark timer to not measure startup costs
b.ResetTimer()

Expand All @@ -60,6 +67,7 @@ func benchmarkUDPListener(times int, b *testing.B) {
l := listener.StatsDUDPListener{
EventHandler: &event.UnbufferedEventHandler{C: events},
Logger: logger,
LineParser: parser,
UDPPackets: udpPackets,
LinesReceived: linesReceived,
SamplesReceived: samplesReceived,
Expand Down
19 changes: 17 additions & 2 deletions line_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,19 @@ func benchmarkLinesToEvents(times int, b *testing.B, input []string) {
// always report allocations since this is a hot path
b.ReportAllocs()

parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()

// reset benchmark timer to not measure startup costs
b.ResetTimer()

for n := 0; n < b.N; n++ {
for i := 0; i < times; i++ {
for _, l := range input {
line.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger)
parser.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger)
}
}
}
Expand Down Expand Up @@ -75,6 +84,12 @@ func BenchmarkLineFormats(b *testing.B) {
"invalidInfluxDb": "foo3,tag1=bar,tag2:100|c",
}

parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()

// reset benchmark timer to not measure startup costs
b.ResetTimer()

Expand All @@ -83,7 +98,7 @@ func BenchmarkLineFormats(b *testing.B) {
// always report allocations since this is a hot path
b.ReportAllocs()
for n := 0; n < b.N; n++ {
line.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger)
parser.LineToEvents(l, *sampleErrors, samplesReceived, tagErrors, tagsReceived, nopLogger)
}
})
}
Expand Down
22 changes: 22 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/prometheus/statsd_exporter/pkg/address"
"github.com/prometheus/statsd_exporter/pkg/event"
"github.com/prometheus/statsd_exporter/pkg/exporter"
"github.com/prometheus/statsd_exporter/pkg/line"
"github.com/prometheus/statsd_exporter/pkg/listener"
"github.com/prometheus/statsd_exporter/pkg/mapper"
)
Expand Down Expand Up @@ -268,6 +269,10 @@ func main() {
eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Number of events to hold in queue before flushing").Default("200ms").Duration()
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
checkConfig = kingpin.Flag("check-config", "Check configuration and exit.").Default("false").Bool()
dogstatsdTagsEnabled = kingpin.Flag("statsd.parse-dogstatsd-tags", "Parse DogStatsd style tags. Enabled by default.").Default("true").Bool()
influxdbTagsEnabled = kingpin.Flag("statsd.parse-influxdb-tags", "Parse InfluxDB style tags. Enabled by default.").Default("true").Bool()
libratoTagsEnabled = kingpin.Flag("statsd.parse-librato-tags", "Parse Librato style tags. Enabled by default.").Default("true").Bool()
signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool()
)

promlogConfig := &promlog.Config{}
Expand All @@ -277,6 +282,20 @@ func main() {
kingpin.Parse()
logger := promlog.New(promlogConfig)

parser := line.NewParser()
if *dogstatsdTagsEnabled {
parser.EnableDogstatsdParsing()
}
if *influxdbTagsEnabled {
parser.EnableInfluxdbParsing()
}
if *libratoTagsEnabled {
parser.EnableLibratoParsing()
}
if *signalFXTagsEnabled {
parser.EnableSignalFXParsing()
}

cacheOption := mapper.WithCacheType(*cacheType)

if *statsdListenUDP == "" && *statsdListenTCP == "" && *statsdListenUnixgram == "" {
Expand Down Expand Up @@ -319,6 +338,7 @@ func main() {
Conn: uconn,
EventHandler: eventQueue,
Logger: logger,
LineParser: parser,
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Expand Down Expand Up @@ -348,6 +368,7 @@ func main() {
Conn: tconn,
EventHandler: eventQueue,
Logger: logger,
LineParser: parser,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
Expand Down Expand Up @@ -391,6 +412,7 @@ func main() {
Conn: uxgconn,
EventHandler: eventQueue,
Logger: logger,
LineParser: parser,
UnixgramPackets: unixgramPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Expand Down
15 changes: 14 additions & 1 deletion pkg/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,11 +617,18 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
events := make(chan event.Events)
ueh := &event.UnbufferedEventHandler{C: events}

parser := line.NewParser()
parser.EnableDogstatsdParsing()
parser.EnableInfluxdbParsing()
parser.EnableLibratoParsing()
parser.EnableSignalFXParsing()

go func() {
for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LineParser: parser,
UDPPackets: udpPackets,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Expand All @@ -633,6 +640,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
Conn: nil,
EventHandler: nil,
Logger: log.NewNopLogger(),
LineParser: parser,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
Expand Down Expand Up @@ -1059,11 +1067,16 @@ func BenchmarkParseDogStatsDTags(b *testing.B) {
"a-z tags": "a:0,b:1,c:2,d:3,e:4,f:5,g:6,h:7,i:8,j:9,k:0,l:1,m:2,n:3,o:4,p:5,q:6,r:7,s:8,t:9,u:0,v:1,w:2,x:3,y:4,z:5",
}

parser := line.NewParser()
parser.EnableDogstatsdParsing()

b.ResetTimer()

for name, tags := range scenarios {
b.Run(name, func(b *testing.B) {
for n := 0; n < b.N; n++ {
labels := map[string]string{}
line.ParseDogStatsDTags(tags, labels, tagErrors, log.NewNopLogger())
parser.ParseDogStatsDTags(tags, labels, tagErrors, log.NewNopLogger())
}
})
}
Expand Down
106 changes: 72 additions & 34 deletions pkg/line/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,40 @@ import (
"github.com/prometheus/statsd_exporter/pkg/mapper"
)

// Parser is a struct to hold configuration for parsing behavior
type Parser struct {
DogstatsdTagsEnabled bool
InfluxdbTagsEnabled bool
LibratoTagsEnabled bool
SignalFXTagsEnabled bool
}

// NewParser returns a new line parser
func NewParser() *Parser {
p := Parser{}
return &p
}

// EnableDogstatsdParsing option to enable dogstatsd tag parsing
func (p *Parser) EnableDogstatsdParsing() {
p.DogstatsdTagsEnabled = true
}

// EnableInfluxdbParsing option to enable influxdb tag parsing
func (p *Parser) EnableInfluxdbParsing() {
p.InfluxdbTagsEnabled = true
}

// EnableLibratoParsing option to enable librato tag parsing
func (p *Parser) EnableLibratoParsing() {
p.LibratoTagsEnabled = true
}

// EnableSignalFXParsing option to enable signalfx tag parsing
func (p *Parser) EnableSignalFXParsing() {
p.SignalFXTagsEnabled = true
}

func buildEvent(statType, metric string, value float64, relative bool, labels map[string]string) (event.Event, error) {
switch statType {
case "c":
Expand Down Expand Up @@ -114,57 +148,61 @@ func trimLeftHash(s string) string {
return s
}

func ParseDogStatsDTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) {
lastTagEndIndex := 0
for i, c := range component {
if c == ',' {
tag := component[lastTagEndIndex:i]
lastTagEndIndex = i + 1
parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger)
func (p *Parser) ParseDogStatsDTags(component string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) {
if p.DogstatsdTagsEnabled {
lastTagEndIndex := 0
for i, c := range component {
if c == ',' {
tag := component[lastTagEndIndex:i]
lastTagEndIndex = i + 1
parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger)
}
}
}

// If we're not off the end of the string, add the last tag
if lastTagEndIndex < len(component) {
tag := component[lastTagEndIndex:]
parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger)
// If we're not off the end of the string, add the last tag
if lastTagEndIndex < len(component) {
tag := component[lastTagEndIndex:]
parseTag(component, trimLeftHash(tag), ':', labels, tagErrors, logger)
}
}
}

func parseNameAndTags(name string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) string {
// check for SignalFx tags first
// `[` delimits start of tags by SignalFx
// `]` delimits end of tags by SignalFx
// https://docs.signalfx.com/en/latest/integrations/agent/monitors/collectd-statsd.html
startIdx := strings.IndexRune(name, '[')
endIdx := strings.IndexRune(name, ']')

switch {
case startIdx != -1 && endIdx != -1:
// good signalfx tags
parseNameTags(name[startIdx+1:endIdx], labels, tagErrors, logger)
return name[:startIdx] + name[endIdx+1:]
case (startIdx != -1) != (endIdx != -1):
// only one bracket, return unparsed
level.Debug(logger).Log("msg", "invalid SignalFx tags, not parsing", "metric", name)
tagErrors.Inc()
return name
func (p *Parser) parseNameAndTags(name string, labels map[string]string, tagErrors prometheus.Counter, logger log.Logger) string {
if p.SignalFXTagsEnabled {
// check for SignalFx tags first
// `[` delimits start of tags by SignalFx
// `]` delimits end of tags by SignalFx
// https://docs.signalfx.com/en/latest/integrations/agent/monitors/collectd-statsd.html
startIdx := strings.IndexRune(name, '[')
endIdx := strings.IndexRune(name, ']')

switch {
case startIdx != -1 && endIdx != -1:
// good signalfx tags
parseNameTags(name[startIdx+1:endIdx], labels, tagErrors, logger)
return name[:startIdx] + name[endIdx+1:]
case (startIdx != -1) != (endIdx != -1):
// only one bracket, return unparsed
level.Debug(logger).Log("msg", "invalid SignalFx tags, not parsing", "metric", name)
tagErrors.Inc()
return name
}
}

for i, c := range name {
// `#` delimits start of tags by Librato
// https://www.librato.com/docs/kb/collect/collection_agents/stastd/#stat-level-tags
// `,` delimits start of tags by InfluxDB
// https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/#introducing-influx-statsd
if c == '#' || c == ',' {
if (c == '#' && p.LibratoTagsEnabled) || (c == ',' && p.InfluxdbTagsEnabled) {
parseNameTags(name[i+1:], labels, tagErrors, logger)
return name[:i]
}
}
return name
}

func LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter, logger log.Logger) event.Events {
func (p *Parser) LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceived prometheus.Counter, tagErrors prometheus.Counter, tagsReceived prometheus.Counter, logger log.Logger) event.Events {
events := event.Events{}
if line == "" {
return events
Expand All @@ -178,7 +216,7 @@ func LineToEvents(line string, sampleErrors prometheus.CounterVec, samplesReceiv
}

labels := map[string]string{}
metric := parseNameAndTags(elements[0], labels, tagErrors, logger)
metric := p.parseNameAndTags(elements[0], labels, tagErrors, logger)

var samples []string
if strings.Contains(elements[1], "|#") {
Expand Down Expand Up @@ -252,7 +290,7 @@ samples:
multiplyEvents = int(1 / samplingFactor)
}
case '#':
ParseDogStatsDTags(component[1:], labels, tagErrors, logger)
p.ParseDogStatsDTags(component[1:], labels, tagErrors, logger)
default:
level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line)
sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
Expand Down
Loading