Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding mapping cache #198

Merged
merged 1 commit into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,17 @@ mappings:

Possible values for `match_metric_type` are `gauge`, `counter` and `timer`.

### Mapping cache size and cache replacement polixy

There is a cache used to improve the performance of the metric mapping, that can greatly improvement performance.
The cache has a default maximum of 1000 unique statsd metric names -> prometheus metrics mappings that it can store.
This maximum can be adjust using the `statsd.cache-size` flag.

If the maximum is reached, entries are rotated using the [least recently used replacement policy](https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU)).

If you are using this exporter to reduce the cardinality of your data, a high maximum cache size can be a costly use of memory.


### Time series expiration

The `ttl` parameter can be used to define the expiration time for stale metrics.
Expand Down
20 changes: 14 additions & 6 deletions exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ func TestNegativeCounter(t *testing.T) {
errorCounter := errorEventStats.WithLabelValues("illegal_negative_counter")
prev := getTelemetryCounterValue(errorCounter)

ex := NewExporter(&mapper.MetricMapper{})
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(&testMapper)
ex.Listen(events)

updated := getTelemetryCounterValue(errorCounter)
Expand Down Expand Up @@ -131,7 +134,7 @@ mappings:
name: "histogram_test"
`
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config)
err := testMapper.InitFromYAMLString(config, 0)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down Expand Up @@ -180,7 +183,7 @@ mappings:
name: "${1}"
`
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config)
err := testMapper.InitFromYAMLString(config, 0)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down Expand Up @@ -218,15 +221,20 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
close(events)
}()

ex := NewExporter(&mapper.MetricMapper{})
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(&testMapper)
ex.Listen(events)
}

func TestHistogramUnits(t *testing.T) {
// Start exporter with a synchronous channel
events := make(chan Events)
go func() {
ex := NewExporter(&mapper.MetricMapper{})
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper)
ex.mapper.Defaults.TimerType = mapper.TimerTypeHistogram
ex.Listen(events)
}()
Expand Down Expand Up @@ -338,7 +346,7 @@ mappings:
`
// Create mapper from config and start an Exporter with a synchronous channel
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config)
err := testMapper.InitFromYAMLString(config, 0)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.1
github.com/howeyc/fsnotify v0.0.0-20151003194602-f0c08ee9c607
github.com/kr/pretty v0.1.0 // indirect
github.com/mattn/go-isatty v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/howeyc/fsnotify v0.0.0-20151003194602-f0c08ee9c607 h1:+7wvV++11s0Okyl1dekihkIiCIYDz+Qk2LvxAShINU4=
github.com/howeyc/fsnotify v0.0.0-20151003194602-f0c08ee9c607/go.mod h1:41HzSPxBGeFRQKEEwgh49TRw/nKBsYZ2cF1OzPjSJsA=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
Expand Down
11 changes: 7 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func tcpAddrFromString(addr string) *net.TCPAddr {
}
}

func watchConfig(fileName string, mapper *mapper.MetricMapper) {
func watchConfig(fileName string, mapper *mapper.MetricMapper, cacheSize int) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
Expand All @@ -106,7 +106,7 @@ func watchConfig(fileName string, mapper *mapper.MetricMapper) {
select {
case ev := <-watcher.Event:
log.Infof("Config file changed (%s), attempting reload", ev)
err = mapper.InitFromFile(fileName)
err = mapper.InitFromFile(fileName, cacheSize)
if err != nil {
log.Errorln("Error reloading config:", err)
configLoads.WithLabelValues("failure").Inc()
Expand Down Expand Up @@ -149,6 +149,7 @@ func main() {
statsdUnixSocketMode = kingpin.Flag("statsd.unixsocket-mode", "The permission mode of the unix socket.").Default("755").String()
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int()
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
)

Expand Down Expand Up @@ -247,7 +248,7 @@ func main() {

mapper := &mapper.MetricMapper{MappingsCount: mappingsCount}
if *mappingConfig != "" {
err := mapper.InitFromFile(*mappingConfig)
err := mapper.InitFromFile(*mappingConfig, *cacheSize)
if err != nil {
log.Fatal("Error loading config:", err)
}
Expand All @@ -257,7 +258,9 @@ func main() {
log.Fatal("Error dumping FSM:", err)
}
}
go watchConfig(*mappingConfig, mapper)
go watchConfig(*mappingConfig, mapper, *cacheSize)
} else {
mapper.InitCache(*cacheSize)
}
exporter := NewExporter(mapper)

Expand Down
42 changes: 34 additions & 8 deletions pkg/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/statsd_exporter/pkg/mapper/fsm"
yaml "gopkg.in/yaml.v2"
"time"
Expand Down Expand Up @@ -49,7 +50,8 @@ type MetricMapper struct {
FSM *fsm.FSM
doFSM bool
doRegex bool
mutex sync.Mutex
cache MetricMapperCache
mutex sync.RWMutex

MappingsCount prometheus.Gauge
}
Expand Down Expand Up @@ -83,7 +85,7 @@ var defaultQuantiles = []metricObjective{
{Quantile: 0.99, Error: 0.001},
}

func (m *MetricMapper) InitFromYAMLString(fileContents string) error {
func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int) error {
var n MetricMapper

if err := yaml.Unmarshal([]byte(fileContents), &n); err != nil {
Expand Down Expand Up @@ -189,6 +191,8 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error {

m.Defaults = n.Defaults
m.Mappings = n.Mappings
m.InitCache(cacheSize)

if n.doFSM {
var mappings []string
for _, mapping := range n.Mappings {
Expand All @@ -206,19 +210,37 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error {
if m.MappingsCount != nil {
m.MappingsCount.Set(float64(len(n.Mappings)))
}

return nil
}

func (m *MetricMapper) InitFromFile(fileName string) error {
func (m *MetricMapper) InitFromFile(fileName string, cacheSize int) error {
mappingStr, err := ioutil.ReadFile(fileName)
if err != nil {
return err
}
return m.InitFromYAMLString(string(mappingStr))

return m.InitFromYAMLString(string(mappingStr), cacheSize)
}

func (m *MetricMapper) InitCache(cacheSize int) {
if cacheSize == 0 {
m.cache = NewMetricMapperNoopCache()
} else {
cache, err := NewMetricMapperCache(cacheSize)
if err != nil {
log.Fatalf("Unable to setup metric cache. Caused by: %s", err)
}
m.cache = cache
}
}

func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricType) (*MetricMapping, prometheus.Labels, bool) {
m.mutex.RLock()
matthiasr marked this conversation as resolved.
Show resolved Hide resolved
defer m.mutex.RUnlock()
result, cached := m.cache.Get(statsdMetric)
if cached {
return result.Mapping, result.Labels, result.Matched
}
// glob matching
if m.doFSM {
finalState, captures := m.FSM.GetMapping(statsdMetric, string(statsdMetricType))
Expand All @@ -230,17 +252,18 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy
for index, formatter := range result.labelFormatters {
labels[result.labelKeys[index]] = formatter.Format(captures)
}

m.cache.AddMatch(statsdMetric, result, labels)

return result, labels, true
} else if !m.doRegex {
// if there's no regex match type, return immediately
m.cache.AddMiss(statsdMetric)
return nil, nil, false
}
}

// regex matching
m.mutex.Lock()
defer m.mutex.Unlock()

for _, mapping := range m.Mappings {
// if a rule don't have regex matching type, the regex field is unset
if mapping.regex == nil {
Expand Down Expand Up @@ -268,8 +291,11 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy
labels[label] = string(value)
}

m.cache.AddMatch(statsdMetric, &mapping, labels)

return &mapping, labels, true
}

m.cache.AddMiss(statsdMetric)
return nil, nil, false
}
Loading