Skip to content

Commit

Permalink
Metricbeat module stats based on libbeat/monitoring (#3745)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored and ruflin committed Mar 15, 2017
1 parent 529218e commit 91970fd
Showing 1 changed file with 54 additions and 32 deletions.
86 changes: 54 additions & 32 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package module

import (
"expvar"
"fmt"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/metricbeat/mb"

Expand All @@ -24,9 +24,10 @@ const (
)

var (
debugf = logp.MakeDebug("module")
debugf = logp.MakeDebug("module")

fetchesLock = sync.Mutex{}
fetches = expvar.NewMap("fetches")
fetches = map[string]*stats{}
)

// Wrapper contains the Module and the private data associated with
Expand All @@ -44,8 +45,17 @@ type Wrapper struct {
// running the MetricSet. It contains a pointer to the parent Module.
type metricSetWrapper struct {
mb.MetricSet
module *Wrapper // Parent Module.
stats *expvar.Map // expvar stats for this MetricSet.
module *Wrapper // Parent Module.
stats *stats // stats for this MetricSet.
}

// stats bundles common metricset stats
type stats struct {
key string // full stats key
ref uint32 // number of modules/metricsets reusing stats instance
success *monitoring.Int
failures *monitoring.Int
events *monitoring.Int
}

// NewWrapper create a new Module and its associated MetricSets based
Expand Down Expand Up @@ -95,15 +105,10 @@ func NewWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, er
debugf("Initializing MetricSet type '%s/%s' for host '%s': %T=%+v",
ms.Module().Name(), ms.Name(), ms.Host(), ms, ms)

expMap, err := getMetricSetExpvarMap(mw.Name(), ms.Name())
if err != nil {
return nil, err
}

msw := &metricSetWrapper{
MetricSet: ms,
module: mw,
stats: expMap,
stats: getMetricSetStats(mw.Name(), ms.Name()),
}
msws = append(msws, msw)
}
Expand Down Expand Up @@ -135,6 +140,7 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan common.MapStr {
wg.Add(len(mw.metricSets))
for _, msw := range mw.metricSets {
go func(msw *metricSetWrapper) {
defer releaseStats(msw.stats)
defer wg.Done()
msw.startFetching(done, out)
}(msw)
Expand Down Expand Up @@ -222,7 +228,7 @@ func (msw *metricSetWrapper) fetch(done <-chan struct{}, out chan<- common.MapSt
return err
}
if event != nil {
msw.stats.Add(eventsKey, 1)
msw.stats.events.Add(1)
writeEvent(done, out, event)
}
case mb.EventsFetcher:
Expand All @@ -231,7 +237,7 @@ func (msw *metricSetWrapper) fetch(done <-chan struct{}, out chan<- common.MapSt
return err
}
for _, event := range events {
msw.stats.Add(eventsKey, 1)
msw.stats.events.Add(1)
if !writeEvent(done, out, event) {
break
}
Expand All @@ -250,9 +256,9 @@ func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher) (common.M
elapsed := time.Since(start)

if err == nil {
msw.stats.Add(successesKey, 1)
msw.stats.success.Add(1)
} else {
msw.stats.Add(failuresKey, 1)
msw.stats.failures.Add(1)
}

if event, err = createEvent(msw, event, err, start, elapsed); err != nil {
Expand All @@ -269,7 +275,7 @@ func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher) ([]common

var rtnEvents []common.MapStr
if err == nil {
msw.stats.Add(successesKey, 1)
msw.stats.success.Add(1)

for _, event := range events {
if event, err = createEvent(msw, event, nil, start, elapsed); err != nil {
Expand All @@ -280,7 +286,7 @@ func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher) ([]common
}
}
} else {
msw.stats.Add(failuresKey, 1)
msw.stats.failures.Add(1)

event, err := createEvent(msw, nil, err, start, elapsed)
if err != nil {
Expand Down Expand Up @@ -311,23 +317,39 @@ func writeEvent(done <-chan struct{}, out chan<- common.MapStr, event common.Map
}
}

func getMetricSetExpvarMap(module, name string) (*expvar.Map, error) {
key := fmt.Sprintf("%s-%s", module, name)
func getMetricSetStats(module, name string) *stats {
key := fmt.Sprintf("metricbeat.%s.%s", module, name)

fetchesLock.Lock()
defer fetchesLock.Unlock()

expVar := fetches.Get(key)
switch m := expVar.(type) {
case nil:
expMap := new(expvar.Map).Init()
fetches.Set(key, expMap)
expMap.Add(successesKey, 0)
expMap.Add(failuresKey, 0)
expMap.Add(eventsKey, 0)
return expMap, nil
case *expvar.Map:
return m, nil
default:
return nil, fmt.Errorf("unexpected expvar.Var type (%T) found for key '%s'", m, key)
if s := fetches[key]; s != nil {
s.ref++
return s
}

reg := monitoring.Default.NewRegistry(key)
s := &stats{
key: key,
ref: 1,
success: monitoring.NewInt(reg, successesKey),
failures: monitoring.NewInt(reg, failuresKey),
events: monitoring.NewInt(reg, eventsKey),
}

fetches[key] = s
return s
}

func releaseStats(s *stats) {
fetchesLock.Lock()
defer fetchesLock.Unlock()

s.ref--
if s.ref > 0 {
return
}

delete(fetches, s.key)
monitoring.Default.Remove(s.key)
}

0 comments on commit 91970fd

Please sign in to comment.