Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] Migrate ceph to reporter V2 with error handling #11979

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions metricbeat/module/ceph/cluster_health/cluster_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package cluster_health

import (
"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand All @@ -42,11 +44,13 @@ func init() {
)
}

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
*helper.HTTP
}

// New creates a new instance of the cluster_health MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
http, err := helper.NewHTTP(base)
if err != nil {
Expand All @@ -63,22 +67,18 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := m.HTTP.FetchContent()
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in fetch")
}

events, err := eventMapping(content)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in mapping")
}

reporter.Event(mb.Event{MetricSetFields: events})

return
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
)

func TestData(t *testing.T) {
f := mbtest.NewReportingMetricSetV2(t, getConfig())
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/ceph/cluster_health/cluster_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func TestFetchEventContents(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewReportingMetricSetV2(t, config)
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/module/ceph/cluster_health/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@ import (
"github.com/elastic/beats/libbeat/common"
)

// Timecheck contains part of the response from a HealthRequest
type Timecheck struct {
RoundStatus string `json:"round_status"`
Epoch int64 `json:"epoch"`
Round int64 `json:"round"`
}

// Output is the body of the status response
type Output struct {
OverallStatus string `json:"overall_status"`
Timechecks Timecheck `json:"timechecks"`
}

// HealthRequest represents the response to a cluster health request
type HealthRequest struct {
Status string `json:"status"`
Output Output `json:"output"`
Expand Down
21 changes: 12 additions & 9 deletions metricbeat/module/ceph/cluster_status/cluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package cluster_status

import (
"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand All @@ -42,11 +44,13 @@ func init() {
)
}

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
*helper.HTTP
}

// New creates a new instance of the cluster_status MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
http, err := helper.NewHTTP(base)
if err != nil {
Expand All @@ -63,24 +67,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := m.HTTP.FetchContent()
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in fetch")
}

events, err := eventsMapping(content)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in mapping")
}

for _, event := range events {
reporter.Event(mb.Event{MetricSetFields: event})
reported := reporter.Event(mb.Event{MetricSetFields: event})
if !reported {
return nil
}
}

return
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
)

func TestData(t *testing.T) {
f := mbtest.NewReportingMetricSetV2(t, getConfig())
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
42 changes: 21 additions & 21 deletions metricbeat/module/ceph/cluster_status/cluster_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func TestFetchEventContents(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewReportingMetricSetV2(t, config)
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down Expand Up @@ -100,23 +100,23 @@ func TestFetchEventContents(t *testing.T) {
assert.EqualValues(t, 2872860672, pgInfo["used_bytes"])

//check pg_state info
pg_stateInfo := events[1].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+undersized+degraded", pg_stateInfo["state_name"])
assert.EqualValues(t, 109, pg_stateInfo["count"])
assert.EqualValues(t, 813, pg_stateInfo["version"])

pg_stateInfo = events[2].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "undersized+degraded+peered", pg_stateInfo["state_name"])
assert.EqualValues(t, 101, pg_stateInfo["count"])
assert.EqualValues(t, 813, pg_stateInfo["version"])

pg_stateInfo = events[3].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+remapped", pg_stateInfo["state_name"])
assert.EqualValues(t, 55, pg_stateInfo["count"])
assert.EqualValues(t, 813, pg_stateInfo["version"])

pg_stateInfo = events[4].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+undersized+degraded+remapped", pg_stateInfo["state_name"])
assert.EqualValues(t, 55, pg_stateInfo["count"])
assert.EqualValues(t, 813, pg_stateInfo["version"])
pgStateInfo := events[1].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+undersized+degraded", pgStateInfo["state_name"])
assert.EqualValues(t, 109, pgStateInfo["count"])
assert.EqualValues(t, 813, pgStateInfo["version"])

pgStateInfo = events[2].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "undersized+degraded+peered", pgStateInfo["state_name"])
assert.EqualValues(t, 101, pgStateInfo["count"])
assert.EqualValues(t, 813, pgStateInfo["version"])

pgStateInfo = events[3].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+remapped", pgStateInfo["state_name"])
assert.EqualValues(t, 55, pgStateInfo["count"])
assert.EqualValues(t, 813, pgStateInfo["version"])

pgStateInfo = events[4].MetricSetFields["pg_state"].(common.MapStr)
assert.EqualValues(t, "active+undersized+degraded+remapped", pgStateInfo["state_name"])
assert.EqualValues(t, 55, pgStateInfo["count"])
assert.EqualValues(t, 813, pgStateInfo["version"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
26 changes: 16 additions & 10 deletions metricbeat/module/ceph/cluster_status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/elastic/beats/libbeat/common"
)

// PgState represents placement group state
type PgState struct {
Count int64 `json:"count"`
StateName string `json:"state_name"`
}

// Pgmap represents data from a placement group
type Pgmap struct {
AvailByte int64 `json:"bytes_avail"`
TotalByte int64 `json:"bytes_total"`
Expand All @@ -54,6 +56,7 @@ type Pgmap struct {
PgStates []PgState `json:"pgs_by_state"`
}

// Osdmap represents data from an OSD
type Osdmap struct {
Epoch int64 `json:"epoch"`
Full bool `json:"full"`
Expand All @@ -64,15 +67,18 @@ type Osdmap struct {
RemapedPgs int64 `json:"num_remapped_pgs"`
}

// Osdmap_ is a placeholder for the json parser
type Osdmap_ struct {
Osdmap Osdmap `json:"osdmap"`
}

// Output is the response body
type Output struct {
Pgmap Pgmap `json:"pgmap"`
Osdmap Osdmap_ `json:"osdmap"`
}

// HealthRequest represents the response to a health request
type HealthRequest struct {
Status string `json:"status"`
Output Output `json:"output"`
Expand Down Expand Up @@ -122,26 +128,26 @@ func eventsMapping(content []byte) ([]common.MapStr, error) {
pg["used_bytes"] = pgmap.UsedByte
pg["data_bytes"] = pgmap.DataByte

state_event := common.MapStr{}
state_event["osd"] = osdState
state_event["traffic"] = traffic
state_event["misplace"] = misplace
state_event["degraded"] = degraded
state_event["pg"] = pg
state_event["version"] = pgmap.Version
stateEvent := common.MapStr{}
stateEvent["osd"] = osdState
stateEvent["traffic"] = traffic
stateEvent["misplace"] = misplace
stateEvent["degraded"] = degraded
stateEvent["pg"] = pg
stateEvent["version"] = pgmap.Version

events := []common.MapStr{}
events = append(events, state_event)
events = append(events, stateEvent)

//pg state info
for _, state := range pgmap.PgStates {
state_evn := common.MapStr{
stateEvn := common.MapStr{
"count": state.Count,
"state_name": state.StateName,
"version": pgmap.Version,
}
evt := common.MapStr{
"pg_state": state_evn,
"pg_state": stateEvn,
}
events = append(events, evt)
}
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/module/ceph/osd_df/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/libbeat/common"
)

// Node represents a node object
type Node struct {
ID int64 `json:"id"`
Name string `json:"name"`
Expand All @@ -35,10 +36,12 @@ type Node struct {
DeviceClass string `json:"device_class"`
}

// Output contains a node list from the df response
type Output struct {
Nodes []Node `json:"nodes"`
}

// OsdDfRequest contains the df response
type OsdDfRequest struct {
Status string `json:"status"`
Output Output `json:"output"`
Expand Down
21 changes: 12 additions & 9 deletions metricbeat/module/ceph/osd_df/osd_df.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package osd_df

import (
"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand All @@ -42,11 +44,13 @@ func init() {
)
}

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
*helper.HTTP
}

// New creates a new instance of the osd_df MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
http, err := helper.NewHTTP(base)
if err != nil {
Expand All @@ -63,24 +67,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := m.HTTP.FetchContent()
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in fetch")
}

events, err := eventsMapping(content)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in mapping")
}

for _, event := range events {
reporter.Event(mb.Event{MetricSetFields: event})
reported := reporter.Event(mb.Event{MetricSetFields: event})
if !reported {
return nil
}
}

return
return nil
}
4 changes: 2 additions & 2 deletions metricbeat/module/ceph/osd_df/osd_df_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
)

func TestData(t *testing.T) {
f := mbtest.NewReportingMetricSetV2(t, getConfig())
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/ceph/osd_df/osd_df_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func TestFetchEventContents(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewReportingMetricSetV2(t, config)
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down
Loading