diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 6883914c607d..803a8c92a98d 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -1,13 +1,13 @@ package module import ( - "expvar" "fmt" "sync" "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/metricbeat/mb" @@ -24,9 +24,10 @@ const ( ) var ( - debugf = logp.MakeDebug("module") + debugf = logp.MakeDebug("module") + fetchesLock = sync.Mutex{} - fetches = expvar.NewMap("fetches") + fetches = map[string]*stats{} ) // Wrapper contains the Module and the private data associated with @@ -44,8 +45,17 @@ type Wrapper struct { // running the MetricSet. It contains a pointer to the parent Module. type metricSetWrapper struct { mb.MetricSet - module *Wrapper // Parent Module. - stats *expvar.Map // expvar stats for this MetricSet. + module *Wrapper // Parent Module. + stats *stats // stats for this MetricSet. +} + +// stats bundles common metricset stats +type stats struct { + key string // full stats key + ref uint32 // number of modules/metricsets reusing stats instance + success *monitoring.Int + failures *monitoring.Int + events *monitoring.Int } // NewWrapper create a new Module and its associated MetricSets based @@ -95,15 +105,10 @@ func NewWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, er debugf("Initializing MetricSet type '%s/%s' for host '%s': %T=%+v", ms.Module().Name(), ms.Name(), ms.Host(), ms, ms) - expMap, err := getMetricSetExpvarMap(mw.Name(), ms.Name()) - if err != nil { - return nil, err - } - msw := &metricSetWrapper{ MetricSet: ms, module: mw, - stats: expMap, + stats: getMetricSetStats(mw.Name(), ms.Name()), } msws = append(msws, msw) } @@ -135,6 +140,7 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan common.MapStr { wg.Add(len(mw.metricSets)) for _, msw := range mw.metricSets { go func(msw *metricSetWrapper) { + defer releaseStats(msw.stats) defer wg.Done() msw.startFetching(done, out) }(msw) @@ -222,7 +228,7 @@ func (msw *metricSetWrapper) fetch(done <-chan struct{}, out chan<- common.MapSt return err } if event != nil { - msw.stats.Add(eventsKey, 1) + msw.stats.events.Add(1) writeEvent(done, out, event) } case mb.EventsFetcher: @@ -231,7 +237,7 @@ func (msw *metricSetWrapper) fetch(done <-chan struct{}, out chan<- common.MapSt return err } for _, event := range events { - msw.stats.Add(eventsKey, 1) + msw.stats.events.Add(1) if !writeEvent(done, out, event) { break } @@ -250,9 +256,9 @@ func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher) (common.M elapsed := time.Since(start) if err == nil { - msw.stats.Add(successesKey, 1) + msw.stats.success.Add(1) } else { - msw.stats.Add(failuresKey, 1) + msw.stats.failures.Add(1) } if event, err = createEvent(msw, event, err, start, elapsed); err != nil { @@ -269,7 +275,7 @@ func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher) ([]common var rtnEvents []common.MapStr if err == nil { - msw.stats.Add(successesKey, 1) + msw.stats.success.Add(1) for _, event := range events { if event, err = createEvent(msw, event, nil, start, elapsed); err != nil { @@ -280,7 +286,7 @@ func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher) ([]common } } } else { - msw.stats.Add(failuresKey, 1) + msw.stats.failures.Add(1) event, err := createEvent(msw, nil, err, start, elapsed) if err != nil { @@ -311,23 +317,39 @@ func writeEvent(done <-chan struct{}, out chan<- common.MapStr, event common.Map } } -func getMetricSetExpvarMap(module, name string) (*expvar.Map, error) { - key := fmt.Sprintf("%s-%s", module, name) +func getMetricSetStats(module, name string) *stats { + key := fmt.Sprintf("metricbeat.%s.%s", module, name) + fetchesLock.Lock() defer fetchesLock.Unlock() - expVar := fetches.Get(key) - switch m := expVar.(type) { - case nil: - expMap := new(expvar.Map).Init() - fetches.Set(key, expMap) - expMap.Add(successesKey, 0) - expMap.Add(failuresKey, 0) - expMap.Add(eventsKey, 0) - return expMap, nil - case *expvar.Map: - return m, nil - default: - return nil, fmt.Errorf("unexpected expvar.Var type (%T) found for key '%s'", m, key) + if s := fetches[key]; s != nil { + s.ref++ + return s + } + + reg := monitoring.Default.NewRegistry(key) + s := &stats{ + key: key, + ref: 1, + success: monitoring.NewInt(reg, successesKey), + failures: monitoring.NewInt(reg, failuresKey), + events: monitoring.NewInt(reg, eventsKey), } + + fetches[key] = s + return s +} + +func releaseStats(s *stats) { + fetchesLock.Lock() + defer fetchesLock.Unlock() + + s.ref-- + if s.ref > 0 { + return + } + + delete(fetches, s.key) + monitoring.Default.Remove(s.key) }