Skip to content

Commit

Permalink
[Metricbeat] Migrate ceph to reporter V2 with error handling (#11979)
Browse files Browse the repository at this point in the history
* migrate ceph to reporter V2 with error handling
  • Loading branch information
fearful-symmetry authored May 1, 2019
1 parent 8bd2510 commit 0a2ae60
Show file tree
Hide file tree
Showing 20 changed files with 125 additions and 92 deletions.
16 changes: 8 additions & 8 deletions metricbeat/module/ceph/cluster_health/cluster_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package cluster_health

import (
"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand All @@ -42,11 +44,13 @@ func init() {
)
}

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
*helper.HTTP
}

// New creates a new instance of the cluster_health MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
http, err := helper.NewHTTP(base)
if err != nil {
Expand All @@ -63,22 +67,18 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := m.HTTP.FetchContent()
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in fetch")
}

events, err := eventMapping(content)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in mapping")
}

reporter.Event(mb.Event{MetricSetFields: events})

return
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
)

func TestData(t *testing.T) {
f := mbtest.NewReportingMetricSetV2(t, getConfig())
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/ceph/cluster_health/cluster_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func TestFetchEventContents(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewReportingMetricSetV2(t, config)
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/module/ceph/cluster_health/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@ import (
"github.com/elastic/beats/libbeat/common"
)

// Timecheck contains part of the response from a HealthRequest
type Timecheck struct {
RoundStatus string `json:"round_status"`
Epoch int64 `json:"epoch"`
Round int64 `json:"round"`
}

// Output is the body of the status response
type Output struct {
OverallStatus string `json:"overall_status"`
Timechecks Timecheck `json:"timechecks"`
}

// HealthRequest represents the response to a cluster health request
type HealthRequest struct {
Status string `json:"status"`
Output Output `json:"output"`
Expand Down
21 changes: 12 additions & 9 deletions metricbeat/module/ceph/cluster_status/cluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package cluster_status

import (
"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand All @@ -42,11 +44,13 @@ func init() {
)
}

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
*helper.HTTP
}

// New creates a new instance of the cluster_status MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
http, err := helper.NewHTTP(base)
if err != nil {
Expand All @@ -63,24 +67,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := m.HTTP.FetchContent()
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in fetch")
}

events, err := eventsMapping(content)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in mapping")
}

for _, event := range events {
reporter.Event(mb.Event{MetricSetFields: event})
reported := reporter.Event(mb.Event{MetricSetFields: event})
if !reported {
return nil
}
}

return
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
)

func TestData(t *testing.T) {
f := mbtest.NewReportingMetricSetV2(t, getConfig())
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
42 changes: 21 additions & 21 deletions metricbeat/module/ceph/cluster_status/cluster_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func TestFetchEventContents(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewReportingMetricSetV2(t, config)
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down Expand Up @@ -100,23 +100,23 @@ func TestFetchEventContents(t *testing.T) {
assert.EqualValues(t, 2872860672, pgInfo["used_bytes"])

//check pg_state info
pg_stateInfo := events[1].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+undersized+degraded", pg_stateInfo["state_name"])
assert.EqualValues(t, 109, pg_stateInfo["count"])
assert.EqualValues(t, 813, pg_stateInfo["version"])

pg_stateInfo = events[2].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "undersized+degraded+peered", pg_stateInfo["state_name"])
assert.EqualValues(t, 101, pg_stateInfo["count"])
assert.EqualValues(t, 813, pg_stateInfo["version"])

pg_stateInfo = events[3].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+remapped", pg_stateInfo["state_name"])
assert.EqualValues(t, 55, pg_stateInfo["count"])
assert.EqualValues(t, 813, pg_stateInfo["version"])

pg_stateInfo = events[4].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+undersized+degraded+remapped", pg_stateInfo["state_name"])
assert.EqualValues(t, 55, pg_stateInfo["count"])
assert.EqualValues(t, 813, pg_stateInfo["version"])
pgStateInfo := events[1].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+undersized+degraded", pgStateInfo["state_name"])
assert.EqualValues(t, 109, pgStateInfo["count"])
assert.EqualValues(t, 813, pgStateInfo["version"])

pgStateInfo = events[2].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "undersized+degraded+peered", pgStateInfo["state_name"])
assert.EqualValues(t, 101, pgStateInfo["count"])
assert.EqualValues(t, 813, pgStateInfo["version"])

pgStateInfo = events[3].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+remapped", pgStateInfo["state_name"])
assert.EqualValues(t, 55, pgStateInfo["count"])
assert.EqualValues(t, 813, pgStateInfo["version"])

pgStateInfo = events[4].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+undersized+degraded+remapped", pgStateInfo["state_name"])
assert.EqualValues(t, 55, pgStateInfo["count"])
assert.EqualValues(t, 813, pgStateInfo["version"])
}
26 changes: 16 additions & 10 deletions metricbeat/module/ceph/cluster_status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/elastic/beats/libbeat/common"
)

// PgState represents placement group state
type PgState struct {
Count int64 `json:"count"`
StateName string `json:"state_name"`
}

// Pgmap represents data from a placement group
type Pgmap struct {
AvailByte int64 `json:"bytes_avail"`
TotalByte int64 `json:"bytes_total"`
Expand All @@ -54,6 +56,7 @@ type Pgmap struct {
PgStates []PgState `json:"pgs_by_state"`
}

// Osdmap represents data from an OSD
type Osdmap struct {
Epoch int64 `json:"epoch"`
Full bool `json:"full"`
Expand All @@ -64,15 +67,18 @@ type Osdmap struct {
RemapedPgs int64 `json:"num_remapped_pgs"`
}

// Osdmap_ is a placeholder for the json parser
type Osdmap_ struct {
Osdmap Osdmap `json:"osdmap"`
}

// Output is the response body
type Output struct {
Pgmap Pgmap `json:"pgmap"`
Osdmap Osdmap_ `json:"osdmap"`
}

// HealthRequest represents the response to a health request
type HealthRequest struct {
Status string `json:"status"`
Output Output `json:"output"`
Expand Down Expand Up @@ -122,26 +128,26 @@ func eventsMapping(content []byte) ([]common.MapStr, error) {
pg["used_bytes"] = pgmap.UsedByte
pg["data_bytes"] = pgmap.DataByte

state_event := common.MapStr{}
state_event["osd"] = osdState
state_event["traffic"] = traffic
state_event["misplace"] = misplace
state_event["degraded"] = degraded
state_event["pg"] = pg
state_event["version"] = pgmap.Version
stateEvent := common.MapStr{}
stateEvent["osd"] = osdState
stateEvent["traffic"] = traffic
stateEvent["misplace"] = misplace
stateEvent["degraded"] = degraded
stateEvent["pg"] = pg
stateEvent["version"] = pgmap.Version

events := []common.MapStr{}
events = append(events, state_event)
events = append(events, stateEvent)

//pg state info
for _, state := range pgmap.PgStates {
state_evn := common.MapStr{
stateEvn := common.MapStr{
"count": state.Count,
"state_name": state.StateName,
"version": pgmap.Version,
}
evt := common.MapStr{
"pg_state": state_evn,
"pg_state": stateEvn,
}
events = append(events, evt)
}
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/module/ceph/osd_df/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/libbeat/common"
)

// Node represents a node object
type Node struct {
ID int64 `json:"id"`
Name string `json:"name"`
Expand All @@ -35,10 +36,12 @@ type Node struct {
DeviceClass string `json:"device_class"`
}

// Output contains a node list from the df response
type Output struct {
Nodes []Node `json:"nodes"`
}

// OsdDfRequest contains the df response
type OsdDfRequest struct {
Status string `json:"status"`
Output Output `json:"output"`
Expand Down
21 changes: 12 additions & 9 deletions metricbeat/module/ceph/osd_df/osd_df.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package osd_df

import (
"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand All @@ -42,11 +44,13 @@ func init() {
)
}

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
*helper.HTTP
}

// New creates a new instance of the osd_df MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
http, err := helper.NewHTTP(base)
if err != nil {
Expand All @@ -63,24 +67,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := m.HTTP.FetchContent()
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in fetch")
}

events, err := eventsMapping(content)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in mapping")
}

for _, event := range events {
reporter.Event(mb.Event{MetricSetFields: event})
reported := reporter.Event(mb.Event{MetricSetFields: event})
if !reported {
return nil
}
}

return
return nil
}
4 changes: 2 additions & 2 deletions metricbeat/module/ceph/osd_df/osd_df_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
)

func TestData(t *testing.T) {
f := mbtest.NewReportingMetricSetV2(t, getConfig())
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/ceph/osd_df/osd_df_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func TestFetchEventContents(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewReportingMetricSetV2(t, config)
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down
Loading

0 comments on commit 0a2ae60

Please sign in to comment.