Skip to content

Commit

Permalink
Add metricset.period to Metricbeat events (#13242)
Browse files Browse the repository at this point in the history
Having the period as part of each event makes it possible
for Kibana or ML to predict when the next event is
potentially missing or delayed based on the period of the
previous events. It can always be that the period changed
but as soon as the next event comes in, this can be used
as the new expected period.

This field is not added to push fetchers, as they don't do
periodic fetching.

Fixes #12616

Co-authored-by: ruflin <[email protected]>
  • Loading branch information
jsoriano and ruflin authored Aug 23, 2019
1 parent 9cfc4e3 commit fea2db3
Show file tree
Hide file tree
Showing 71 changed files with 18,745 additions and 17,325 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add cgroup memory stats to docker/memory metricset {pull}12916[12916]
- Add AWS elb metricset. {pull}12952[12952] {issue}11701[11701]
- Add AWS ebs metricset. {pull}13167[13167] {issue}11699[11699]
- Add `metricset.period` field with the configured fetching period. {pull}13242[13242] {issue}12616[12616]
- Add rate metrics for ec2 metricset. {pull}13203[13203]

*Packetbeat*
Expand Down
5 changes: 5 additions & 0 deletions metricbeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
description: >
The name of the metricset that generated the event.
- name: metricset.period
type: integer
description: >
Current data collection period for this event in milliseconds.
- name: process.pgid
type: long
description: >
Expand Down
10 changes: 10 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -3405,6 +3405,16 @@ alias to: event.module
The name of the metricset that generated the event.
--
*`metricset.period`*::
+
--
Current data collection period for this event in milliseconds.
type: integer
--
*`process.pgid`*::
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/include/fields/fields.go

Large diffs are not rendered by default.

25 changes: 19 additions & 6 deletions metricbeat/mb/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Event struct {
Host string // Host from which the data was collected.
Service string // Service type
Took time.Duration // Amount of time it took to collect the event data.
Period time.Duration // Period that is set to retrieve the events

DisableTimeSeries bool // true if the event doesn't contain timeseries data
}
Expand Down Expand Up @@ -110,17 +111,26 @@ func (e *Event) BeatEvent(module, metricSet string, modifiers ...EventModifier)

// AddMetricSetInfo is an EventModifier that adds information about the
// MetricSet that generated the event. It will always add the metricset and
// module names. And it will add the host, namespace, and rtt (round-trip time
// in microseconds) values if they are non-zero values.
// module names. And it will add the host, period (in milliseconds), and
// duration (round-trip time in nanoseconds) values if they are non-zero
// values.
//
// {
// "event": {
// "dataset": "apache.status",
// "duration": 115,
// "module": "apache"
// },
// "service": {
// "address": "127.0.0.1",
// },
// "metricset": {
// "host": "apache",
// "module": "apache",
// "name": "status",
// "rtt": 115
// "period": 10000
// }
// }
//
func AddMetricSetInfo(module, metricset string, event *Event) {

if event.Namespace == "" {
event.Namespace = fmt.Sprintf("%s.%s", module, metricset)
}
Expand All @@ -141,6 +151,9 @@ func AddMetricSetInfo(module, metricset string, event *Event) {
if event.Took > 0 {
e.Put("event.duration", event.Took/time.Nanosecond)
}
if event.Period > 0 {
e.Put("metricset.period", event.Period/time.Millisecond)
}

if event.RootFields == nil {
event.RootFields = e
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/mb/module/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ func ExampleWrapper() {
// }
// },
// "metricset": {
// "name": "eventfetcher"
// "name": "eventfetcher",
// "period": 10000
// },
// "service": {
// "type": "fake"
Expand Down
8 changes: 8 additions & 0 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type metricSetWrapper struct {
mb.MetricSet
module *Wrapper // Parent Module.
stats *stats // stats for this MetricSet.

periodic bool // Set to true if this metricset is a periodic fetcher
}

// stats bundles common metricset stats.
Expand Down Expand Up @@ -208,6 +210,9 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
// begins a continuous timer scheduled loop to fetch data. To stop the loop the
// done channel should be closed.
func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter reporter) {
// Indicate that it has been started as periodic fetcher
msw.periodic = true

// Fetch immediately.
msw.fetch(ctx, reporter)

Expand Down Expand Up @@ -367,6 +372,9 @@ func (r reporterV2) Event(event mb.Event) bool {
if event.Took == 0 && !r.start.IsZero() {
event.Took = time.Since(r.start)
}
if r.msw.periodic {
event.Period = r.msw.Module().Config().Period
}

if event.Timestamp.IsZero() {
if !r.start.IsZero() {
Expand Down
47 changes: 45 additions & 2 deletions metricbeat/mb/module/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ func TestWrapperOfPushMetricSet(t *testing.T) {
<-output
close(done)

// Validate that the channel is closed after receiving the two
// initial events.
// Validate that the channel is closed after receiving the event.
select {
case _, ok := <-output:
if !ok {
Expand All @@ -227,3 +226,47 @@ func TestWrapperOfPushMetricSet(t *testing.T) {
}
}
}

func TestPeriodIsAddedToEvent(t *testing.T) {
cases := map[string]struct {
metricset string
hasPeriod bool
}{
"fetch metricset events should have period": {
metricset: eventFetcherName,
hasPeriod: true,
},
"push metricset events should not have period": {
metricset: pushMetricSetName,
hasPeriod: false,
},
}

registry := newTestRegistry(t)

for title, c := range cases {
t.Run(title, func(t *testing.T) {
hosts := []string{"alpha"}
config := newConfig(t, map[string]interface{}{
"module": moduleName,
"metricsets": []string{c.metricset},
"hosts": hosts,
})

m, err := module.NewWrapper(config, registry, module.WithMetricSetInfo())
if err != nil {
t.Fatal(err)
}

done := make(chan struct{})
defer close(done)

output := m.Start(done)

event := <-output

hasPeriod, _ := event.Fields.HasKey("metricset.period")
assert.Equal(t, c.hasPeriod, hasPeriod, "has metricset.period in event %+v", event)
})
}
}
1 change: 1 addition & 0 deletions metricbeat/mb/testing/data_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func StandardizeEvent(ms mb.MetricSet, e mb.Event, modifiers ...mb.EventModifier
e.Timestamp = startTime
e.Took = 115 * time.Microsecond
e.Host = ms.Host()
e.Period = 10 * time.Second
if e.Namespace == "" {
e.Namespace = ms.Registration().Namespace
}
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/apache/status/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
"module": "apache"
},
"metricset": {
"name": "status"
"name": "status",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"system": 0.2,
"user": 0.16
},
"hostname": "127.0.0.1:34545",
"hostname": "127.0.0.1:37579",
"load": {
"1": 2,
"15": 1.91,
Expand Down Expand Up @@ -58,7 +58,8 @@
"module": "apache"
},
"metricset": {
"name": "status"
"name": "status",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/ceph/monitor_health/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"module": "ceph"
},
"metricset": {
"name": "monitor_health"
"name": "monitor_health",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"module": "ceph"
},
"metricset": {
"name": "monitor_health"
"name": "monitor_health",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/couchbase/cluster/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
"module": "couchbase"
},
"metricset": {
"name": "cluster"
"name": "cluster",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
"module": "couchbase"
},
"metricset": {
"name": "cluster"
"name": "cluster",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
"module": "couchbase"
},
"metricset": {
"name": "cluster"
"name": "cluster",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/couchbase/node/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
"module": "couchbase"
},
"metricset": {
"name": "node"
"name": "node",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
"module": "couchbase"
},
"metricset": {
"name": "node"
"name": "node",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
"module": "couchbase"
},
"metricset": {
"name": "node"
"name": "node",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/dropwizard/collector/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
"module": "dropwizard"
},
"metricset": {
"name": "collector"
"name": "collector",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
"module": "dropwizard"
},
"metricset": {
"name": "collector"
"name": "collector",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down Expand Up @@ -75,7 +76,8 @@
"module": "dropwizard"
},
"metricset": {
"name": "collector"
"name": "collector",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@
"module": "etcd"
},
"metricset": {
"name": "metrics"
"name": "metrics",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/http/json/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
}
},
"metricset": {
"name": "json"
"name": "json",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
}
},
"metricset": {
"name": "json"
"name": "json",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/kibana/status/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
}
},
"metricset": {
"name": "status"
"name": "status",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
}
},
"metricset": {
"name": "status"
"name": "status",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
}
},
"metricset": {
"name": "status"
"name": "status",
"period": 10000
},
"service": {
"address": "127.0.0.1:55555",
Expand Down
Loading

0 comments on commit fea2db3

Please sign in to comment.