Skip to content

Commit

Permalink
Adding mapping cache
Browse files Browse the repository at this point in the history
Signed-off-by: SpencerMalone <[email protected]>
  • Loading branch information
SpencerMalone committed Apr 26, 2019
1 parent 54cf241 commit 35d1a99
Show file tree
Hide file tree
Showing 20 changed files with 1,531 additions and 42 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,17 @@ mappings:

Possible values for `match_metric_type` are `gauge`, `counter` and `timer`.

### Mapping cache size and cache replacement polixy

There is a cache used to improve the performance of the metric mapping, that can greatly improvement performance.
The cache has a default maximum of 1000 unique statsd metric names -> prometheus metrics mappings that it can store.
This maximum can be adjust using the `statsd.cache-size` flag.

If the maximum is reached, entries are rotated using the [least recently used replacement policy](https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU)).

If you are using this exporter to reduce the cardinality of your data, a high maximum cache size can be a costly use of memory.


### Time series expiration

The `ttl` parameter can be used to define the expiration time for stale metrics.
Expand Down
20 changes: 14 additions & 6 deletions exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ func TestNegativeCounter(t *testing.T) {
errorCounter := errorEventStats.WithLabelValues("illegal_negative_counter")
prev := getTelemetryCounterValue(errorCounter)

ex := NewExporter(&mapper.MetricMapper{})
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(&testMapper)
ex.Listen(events)

updated := getTelemetryCounterValue(errorCounter)
Expand Down Expand Up @@ -131,7 +134,7 @@ mappings:
name: "histogram_test"
`
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config)
err := testMapper.InitFromYAMLString(config, 0)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down Expand Up @@ -180,7 +183,7 @@ mappings:
name: "${1}"
`
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config)
err := testMapper.InitFromYAMLString(config, 0)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down Expand Up @@ -218,15 +221,20 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
close(events)
}()

ex := NewExporter(&mapper.MetricMapper{})
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)

ex := NewExporter(&testMapper)
ex.Listen(events)
}

func TestHistogramUnits(t *testing.T) {
// Start exporter with a synchronous channel
events := make(chan Events)
go func() {
ex := NewExporter(&mapper.MetricMapper{})
testMapper := mapper.MetricMapper{}
testMapper.InitCache(0)
ex := NewExporter(&testMapper)
ex.mapper.Defaults.TimerType = mapper.TimerTypeHistogram
ex.Listen(events)
}()
Expand Down Expand Up @@ -338,7 +346,7 @@ mappings:
`
// Create mapper from config and start an Exporter with a synchronous channel
testMapper := &mapper.MetricMapper{}
err := testMapper.InitFromYAMLString(config)
err := testMapper.InitFromYAMLString(config, 0)
if err != nil {
t.Fatalf("Config load error: %s %s", config, err)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.1
github.com/howeyc/fsnotify v0.0.0-20151003194602-f0c08ee9c607
github.com/kr/pretty v0.1.0 // indirect
github.com/mattn/go-isatty v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/howeyc/fsnotify v0.0.0-20151003194602-f0c08ee9c607 h1:+7wvV++11s0Okyl1dekihkIiCIYDz+Qk2LvxAShINU4=
github.com/howeyc/fsnotify v0.0.0-20151003194602-f0c08ee9c607/go.mod h1:41HzSPxBGeFRQKEEwgh49TRw/nKBsYZ2cF1OzPjSJsA=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
Expand Down
11 changes: 7 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func tcpAddrFromString(addr string) *net.TCPAddr {
}
}

func watchConfig(fileName string, mapper *mapper.MetricMapper) {
func watchConfig(fileName string, mapper *mapper.MetricMapper, cacheSize int) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
Expand All @@ -106,7 +106,7 @@ func watchConfig(fileName string, mapper *mapper.MetricMapper) {
select {
case ev := <-watcher.Event:
log.Infof("Config file changed (%s), attempting reload", ev)
err = mapper.InitFromFile(fileName)
err = mapper.InitFromFile(fileName, cacheSize)
if err != nil {
log.Errorln("Error reloading config:", err)
configLoads.WithLabelValues("failure").Inc()
Expand Down Expand Up @@ -149,6 +149,7 @@ func main() {
statsdUnixSocketMode = kingpin.Flag("statsd.unixsocket-mode", "The permission mode of the unix socket.").Default("755").String()
mappingConfig = kingpin.Flag("statsd.mapping-config", "Metric mapping configuration file name.").String()
readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int()
cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int()
dumpFSMPath = kingpin.Flag("debug.dump-fsm", "The path to dump internal FSM generated for glob matching as Dot file.").Default("").String()
)

Expand Down Expand Up @@ -247,7 +248,7 @@ func main() {

mapper := &mapper.MetricMapper{MappingsCount: mappingsCount}
if *mappingConfig != "" {
err := mapper.InitFromFile(*mappingConfig)
err := mapper.InitFromFile(*mappingConfig, *cacheSize)
if err != nil {
log.Fatal("Error loading config:", err)
}
Expand All @@ -257,7 +258,9 @@ func main() {
log.Fatal("Error dumping FSM:", err)
}
}
go watchConfig(*mappingConfig, mapper)
go watchConfig(*mappingConfig, mapper, *cacheSize)
} else {
mapper.InitCache(*cacheSize)
}
exporter := NewExporter(mapper)

Expand Down
42 changes: 34 additions & 8 deletions pkg/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/statsd_exporter/pkg/mapper/fsm"
yaml "gopkg.in/yaml.v2"
"time"
Expand Down Expand Up @@ -49,7 +50,8 @@ type MetricMapper struct {
FSM *fsm.FSM
doFSM bool
doRegex bool
mutex sync.Mutex
cache MetricMapperCache
mutex sync.RWMutex

MappingsCount prometheus.Gauge
}
Expand Down Expand Up @@ -83,7 +85,7 @@ var defaultQuantiles = []metricObjective{
{Quantile: 0.99, Error: 0.001},
}

func (m *MetricMapper) InitFromYAMLString(fileContents string) error {
func (m *MetricMapper) InitFromYAMLString(fileContents string, cacheSize int) error {
var n MetricMapper

if err := yaml.Unmarshal([]byte(fileContents), &n); err != nil {
Expand Down Expand Up @@ -189,6 +191,8 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error {

m.Defaults = n.Defaults
m.Mappings = n.Mappings
m.InitCache(cacheSize)

if n.doFSM {
var mappings []string
for _, mapping := range n.Mappings {
Expand All @@ -206,19 +210,37 @@ func (m *MetricMapper) InitFromYAMLString(fileContents string) error {
if m.MappingsCount != nil {
m.MappingsCount.Set(float64(len(n.Mappings)))
}

return nil
}

func (m *MetricMapper) InitFromFile(fileName string) error {
func (m *MetricMapper) InitFromFile(fileName string, cacheSize int) error {
mappingStr, err := ioutil.ReadFile(fileName)
if err != nil {
return err
}
return m.InitFromYAMLString(string(mappingStr))

return m.InitFromYAMLString(string(mappingStr), cacheSize)
}

func (m *MetricMapper) InitCache(cacheSize int) {
if cacheSize == 0 {
m.cache = NewMetricMapperNoopCache()
} else {
cache, err := NewMetricMapperCache(cacheSize)
if err != nil {
log.Fatalf("Unable to setup metric cache. Caused by: %s", err)
}
m.cache = cache
}
}

func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricType) (*MetricMapping, prometheus.Labels, bool) {
m.mutex.RLock()
defer m.mutex.RUnlock()
result, cached := m.cache.Get(statsdMetric)
if cached {
return result.Mapping, result.Labels, result.Matched
}
// glob matching
if m.doFSM {
finalState, captures := m.FSM.GetMapping(statsdMetric, string(statsdMetricType))
Expand All @@ -230,17 +252,18 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy
for index, formatter := range result.labelFormatters {
labels[result.labelKeys[index]] = formatter.Format(captures)
}

m.cache.AddMatch(statsdMetric, result, labels)

return result, labels, true
} else if !m.doRegex {
// if there's no regex match type, return immediately
m.cache.AddMiss(statsdMetric)
return nil, nil, false
}
}

// regex matching
m.mutex.Lock()
defer m.mutex.Unlock()

for _, mapping := range m.Mappings {
// if a rule don't have regex matching type, the regex field is unset
if mapping.regex == nil {
Expand Down Expand Up @@ -268,8 +291,11 @@ func (m *MetricMapper) GetMapping(statsdMetric string, statsdMetricType MetricTy
labels[label] = string(value)
}

m.cache.AddMatch(statsdMetric, &mapping, labels)

return &mapping, labels, true
}

m.cache.AddMiss(statsdMetric)
return nil, nil, false
}
Loading

0 comments on commit 35d1a99

Please sign in to comment.