From 3c67717d9c85a090aa63a35d03bbeedd89e1541a Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Mon, 29 Apr 2019 10:27:57 -0500 Subject: [PATCH 1/4] migrate ceph to reporter V2 with error handling --- .../ceph/cluster_health/cluster_health.go | 15 ++++--- .../cluster_health_integration_test.go | 4 +- .../cluster_health/cluster_health_test.go | 4 +- metricbeat/module/ceph/cluster_health/data.go | 3 ++ .../ceph/cluster_status/cluster_status.go | 20 +++++---- .../cluster_status_integration_test.go | 4 +- .../cluster_status/cluster_status_test.go | 42 +++++++++---------- metricbeat/module/ceph/cluster_status/data.go | 26 +++++++----- metricbeat/module/ceph/osd_df/data.go | 3 ++ metricbeat/module/ceph/osd_df/osd_df.go | 20 +++++---- .../ceph/osd_df/osd_df_integration_test.go | 4 +- metricbeat/module/ceph/osd_df/osd_df_test.go | 4 +- metricbeat/module/ceph/osd_tree/data.go | 3 ++ metricbeat/module/ceph/osd_tree/osd_tree.go | 15 ++++--- .../osd_tree/osd_tree_integration_test.go | 4 +- .../module/ceph/osd_tree/osd_tree_test.go | 4 +- metricbeat/module/ceph/pool_disk/data.go | 8 +++- metricbeat/module/ceph/pool_disk/pool_disk.go | 16 ++++--- .../pool_disk/pool_disk_integration_test.go | 4 +- .../module/ceph/pool_disk/pool_disk_test.go | 4 +- 20 files changed, 116 insertions(+), 91 deletions(-) diff --git a/metricbeat/module/ceph/cluster_health/cluster_health.go b/metricbeat/module/ceph/cluster_health/cluster_health.go index e1579b21c502..8b38d73a4e72 100644 --- a/metricbeat/module/ceph/cluster_health/cluster_health.go +++ b/metricbeat/module/ceph/cluster_health/cluster_health.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" + "github.com/pkg/errors" ) const ( @@ -42,11 +43,13 @@ func init() { ) } +// MetricSet type defines all fields of the MetricSet type MetricSet struct { mb.BaseMetricSet *helper.HTTP } +// New creates a new instance of the docker memory MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { http, err := helper.NewHTTP(base) if err != nil { @@ -63,22 +66,18 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { content, err := m.HTTP.FetchContent() if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in fetch") } events, err := eventMapping(content) if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in mapping") } reporter.Event(mb.Event{MetricSetFields: events}) - return + return nil } diff --git a/metricbeat/module/ceph/cluster_health/cluster_health_integration_test.go b/metricbeat/module/ceph/cluster_health/cluster_health_integration_test.go index fe876b212191..de9c05c50211 100644 --- a/metricbeat/module/ceph/cluster_health/cluster_health_integration_test.go +++ b/metricbeat/module/ceph/cluster_health/cluster_health_integration_test.go @@ -26,9 +26,9 @@ import ( ) func TestData(t *testing.T) { - f := mbtest.NewReportingMetricSetV2(t, getConfig()) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) - if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { + if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil { t.Fatal("write", err) } } diff --git a/metricbeat/module/ceph/cluster_health/cluster_health_test.go b/metricbeat/module/ceph/cluster_health/cluster_health_test.go index 5cc1cdb63667..9f1d15fb2a54 100644 --- a/metricbeat/module/ceph/cluster_health/cluster_health_test.go +++ b/metricbeat/module/ceph/cluster_health/cluster_health_test.go @@ -50,8 +50,8 @@ func TestFetchEventContents(t *testing.T) { "hosts": []string{server.URL}, } - f := mbtest.NewReportingMetricSetV2(t, config) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, config) + events, errs := mbtest.ReportingFetchV2Error(f) if len(errs) > 0 { t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) } diff --git a/metricbeat/module/ceph/cluster_health/data.go b/metricbeat/module/ceph/cluster_health/data.go index f624d5bdacb7..46b25efdacc7 100644 --- a/metricbeat/module/ceph/cluster_health/data.go +++ b/metricbeat/module/ceph/cluster_health/data.go @@ -25,17 +25,20 @@ import ( "github.com/elastic/beats/libbeat/common" ) +//Timecheck contains part of the response from a HealthRequest type Timecheck struct { RoundStatus string `json:"round_status"` Epoch int64 `json:"epoch"` Round int64 `json:"round"` } +//Output is the body of the status response type Output struct { OverallStatus string `json:"overall_status"` Timechecks Timecheck `json:"timechecks"` } +//HealthRequest represents the response to a cluster health request type HealthRequest struct { Status string `json:"status"` Output Output `json:"output"` diff --git a/metricbeat/module/ceph/cluster_status/cluster_status.go b/metricbeat/module/ceph/cluster_status/cluster_status.go index 029431ef3be4..76e759a611f0 100644 --- a/metricbeat/module/ceph/cluster_status/cluster_status.go +++ b/metricbeat/module/ceph/cluster_status/cluster_status.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" + "github.com/pkg/errors" ) const ( @@ -42,11 +43,13 @@ func init() { ) } +// MetricSet type defines all fields of the MetricSet type MetricSet struct { mb.BaseMetricSet *helper.HTTP } +// New creates a new instance of the docker memory MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { http, err := helper.NewHTTP(base) if err != nil { @@ -63,24 +66,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { content, err := m.HTTP.FetchContent() if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in fetch") } events, err := eventsMapping(content) if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in mapping") } for _, event := range events { - reporter.Event(mb.Event{MetricSetFields: event}) + reported := reporter.Event(mb.Event{MetricSetFields: event}) + if !reported { + return nil + } } - return + return nil } diff --git a/metricbeat/module/ceph/cluster_status/cluster_status_integration_test.go b/metricbeat/module/ceph/cluster_status/cluster_status_integration_test.go index d6b43ae6f93d..d3b20eefd212 100644 --- a/metricbeat/module/ceph/cluster_status/cluster_status_integration_test.go +++ b/metricbeat/module/ceph/cluster_status/cluster_status_integration_test.go @@ -26,9 +26,9 @@ import ( ) func TestData(t *testing.T) { - f := mbtest.NewReportingMetricSetV2(t, getConfig()) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) - if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { + if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil { t.Fatal("write", err) } } diff --git a/metricbeat/module/ceph/cluster_status/cluster_status_test.go b/metricbeat/module/ceph/cluster_status/cluster_status_test.go index c53d2a895f6b..c9066e707225 100644 --- a/metricbeat/module/ceph/cluster_status/cluster_status_test.go +++ b/metricbeat/module/ceph/cluster_status/cluster_status_test.go @@ -50,8 +50,8 @@ func TestFetchEventContents(t *testing.T) { "hosts": []string{server.URL}, } - f := mbtest.NewReportingMetricSetV2(t, config) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, config) + events, errs := mbtest.ReportingFetchV2Error(f) if len(errs) > 0 { t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) } @@ -100,23 +100,23 @@ func TestFetchEventContents(t *testing.T) { assert.EqualValues(t, 2872860672, pgInfo["used_bytes"]) //check pg_state info - pg_stateInfo := events[1].MetricSetFields["pg_state"].(common.MapStr) - assert.EqualValues(t, "active+undersized+degraded", pg_stateInfo["state_name"]) - assert.EqualValues(t, 109, pg_stateInfo["count"]) - assert.EqualValues(t, 813, pg_stateInfo["version"]) - - pg_stateInfo = events[2].MetricSetFields["pg_state"].(common.MapStr) - assert.EqualValues(t, "undersized+degraded+peered", pg_stateInfo["state_name"]) - assert.EqualValues(t, 101, pg_stateInfo["count"]) - assert.EqualValues(t, 813, pg_stateInfo["version"]) - - pg_stateInfo = events[3].MetricSetFields["pg_state"].(common.MapStr) - assert.EqualValues(t, "active+remapped", pg_stateInfo["state_name"]) - assert.EqualValues(t, 55, pg_stateInfo["count"]) - assert.EqualValues(t, 813, pg_stateInfo["version"]) - - pg_stateInfo = events[4].MetricSetFields["pg_state"].(common.MapStr) - assert.EqualValues(t, "active+undersized+degraded+remapped", pg_stateInfo["state_name"]) - assert.EqualValues(t, 55, pg_stateInfo["count"]) - assert.EqualValues(t, 813, pg_stateInfo["version"]) + pgStateInfo := events[1].MetricSetFields["pg_state"].(common.MapStr) + assert.EqualValues(t, "active+undersized+degraded", pgStateInfo["state_name"]) + assert.EqualValues(t, 109, pgStateInfo["count"]) + assert.EqualValues(t, 813, pgStateInfo["version"]) + + pgStateInfo = events[2].MetricSetFields["pg_state"].(common.MapStr) + assert.EqualValues(t, "undersized+degraded+peered", pgStateInfo["state_name"]) + assert.EqualValues(t, 101, pgStateInfo["count"]) + assert.EqualValues(t, 813, pgStateInfo["version"]) + + pgStateInfo = events[3].MetricSetFields["pg_state"].(common.MapStr) + assert.EqualValues(t, "active+remapped", pgStateInfo["state_name"]) + assert.EqualValues(t, 55, pgStateInfo["count"]) + assert.EqualValues(t, 813, pgStateInfo["version"]) + + pgStateInfo = events[4].MetricSetFields["pg_state"].(common.MapStr) + assert.EqualValues(t, "active+undersized+degraded+remapped", pgStateInfo["state_name"]) + assert.EqualValues(t, 55, pgStateInfo["count"]) + assert.EqualValues(t, 813, pgStateInfo["version"]) } diff --git a/metricbeat/module/ceph/cluster_status/data.go b/metricbeat/module/ceph/cluster_status/data.go index a9544ae5f9ff..0368b965bd62 100644 --- a/metricbeat/module/ceph/cluster_status/data.go +++ b/metricbeat/module/ceph/cluster_status/data.go @@ -25,11 +25,13 @@ import ( "github.com/elastic/beats/libbeat/common" ) +//PgState represents placement group state type PgState struct { Count int64 `json:"count"` StateName string `json:"state_name"` } +//Pgmap represents data from a placement group type Pgmap struct { AvailByte int64 `json:"bytes_avail"` TotalByte int64 `json:"bytes_total"` @@ -54,6 +56,7 @@ type Pgmap struct { PgStates []PgState `json:"pgs_by_state"` } +//Osdmap represents data from an OSD type Osdmap struct { Epoch int64 `json:"epoch"` Full bool `json:"full"` @@ -64,15 +67,18 @@ type Osdmap struct { RemapedPgs int64 `json:"num_remapped_pgs"` } +//Osdmap_ is a placeholder for the json parser type Osdmap_ struct { Osdmap Osdmap `json:"osdmap"` } +//Output is the response body type Output struct { Pgmap Pgmap `json:"pgmap"` Osdmap Osdmap_ `json:"osdmap"` } +//HealthRequest represents the response to a health request type HealthRequest struct { Status string `json:"status"` Output Output `json:"output"` @@ -122,26 +128,26 @@ func eventsMapping(content []byte) ([]common.MapStr, error) { pg["used_bytes"] = pgmap.UsedByte pg["data_bytes"] = pgmap.DataByte - state_event := common.MapStr{} - state_event["osd"] = osdState - state_event["traffic"] = traffic - state_event["misplace"] = misplace - state_event["degraded"] = degraded - state_event["pg"] = pg - state_event["version"] = pgmap.Version + stateEvent := common.MapStr{} + stateEvent["osd"] = osdState + stateEvent["traffic"] = traffic + stateEvent["misplace"] = misplace + stateEvent["degraded"] = degraded + stateEvent["pg"] = pg + stateEvent["version"] = pgmap.Version events := []common.MapStr{} - events = append(events, state_event) + events = append(events, stateEvent) //pg state info for _, state := range pgmap.PgStates { - state_evn := common.MapStr{ + stateEvn := common.MapStr{ "count": state.Count, "state_name": state.StateName, "version": pgmap.Version, } evt := common.MapStr{ - "pg_state": state_evn, + "pg_state": stateEvn, } events = append(events, evt) } diff --git a/metricbeat/module/ceph/osd_df/data.go b/metricbeat/module/ceph/osd_df/data.go index d8888f034f4f..08b08c39f257 100644 --- a/metricbeat/module/ceph/osd_df/data.go +++ b/metricbeat/module/ceph/osd_df/data.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/libbeat/common" ) +//Node represents a node object type Node struct { ID int64 `json:"id"` Name string `json:"name"` @@ -35,10 +36,12 @@ type Node struct { DeviceClass string `json:"device_class"` } +//Output contains a node list from the df response type Output struct { Nodes []Node `json:"nodes"` } +//OsdDfRequest contains the df response type OsdDfRequest struct { Status string `json:"status"` Output Output `json:"output"` diff --git a/metricbeat/module/ceph/osd_df/osd_df.go b/metricbeat/module/ceph/osd_df/osd_df.go index 001150832ff6..b0de7fd0d8a8 100644 --- a/metricbeat/module/ceph/osd_df/osd_df.go +++ b/metricbeat/module/ceph/osd_df/osd_df.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" + "github.com/pkg/errors" ) const ( @@ -42,11 +43,13 @@ func init() { ) } +// MetricSet type defines all fields of the MetricSet type MetricSet struct { mb.BaseMetricSet *helper.HTTP } +// New creates a new instance of the docker memory MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { http, err := helper.NewHTTP(base) if err != nil { @@ -63,24 +66,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { content, err := m.HTTP.FetchContent() if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in fetch") } events, err := eventsMapping(content) if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in mapping") } for _, event := range events { - reporter.Event(mb.Event{MetricSetFields: event}) + reported := reporter.Event(mb.Event{MetricSetFields: event}) + if !reported { + return nil + } } - return + return nil } diff --git a/metricbeat/module/ceph/osd_df/osd_df_integration_test.go b/metricbeat/module/ceph/osd_df/osd_df_integration_test.go index 2ca2dde0d821..959579be89e0 100644 --- a/metricbeat/module/ceph/osd_df/osd_df_integration_test.go +++ b/metricbeat/module/ceph/osd_df/osd_df_integration_test.go @@ -26,9 +26,9 @@ import ( ) func TestData(t *testing.T) { - f := mbtest.NewReportingMetricSetV2(t, getConfig()) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) - if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { + if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil { t.Fatal("write", err) } } diff --git a/metricbeat/module/ceph/osd_df/osd_df_test.go b/metricbeat/module/ceph/osd_df/osd_df_test.go index 088cb172e9a3..cc9683956970 100644 --- a/metricbeat/module/ceph/osd_df/osd_df_test.go +++ b/metricbeat/module/ceph/osd_df/osd_df_test.go @@ -49,8 +49,8 @@ func TestFetchEventContents(t *testing.T) { "hosts": []string{server.URL}, } - f := mbtest.NewReportingMetricSetV2(t, config) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, config) + events, errs := mbtest.ReportingFetchV2Error(f) if len(errs) > 0 { t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) } diff --git a/metricbeat/module/ceph/osd_tree/data.go b/metricbeat/module/ceph/osd_tree/data.go index bc0290acddfa..8a3b47963f7c 100644 --- a/metricbeat/module/ceph/osd_tree/data.go +++ b/metricbeat/module/ceph/osd_tree/data.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/libbeat/logp" ) +//Node represents a node object type Node struct { ID int64 `json:"id"` Name string `json:"name"` @@ -42,10 +43,12 @@ type Node struct { DeviceClass string `json:"device_class"` } +//Output contains a node list from the df response type Output struct { Nodes []Node `json:"nodes"` } +//OsdTreeRequest is a OSD response object type OsdTreeRequest struct { Status string `json:"status"` Output Output `json:"output"` diff --git a/metricbeat/module/ceph/osd_tree/osd_tree.go b/metricbeat/module/ceph/osd_tree/osd_tree.go index b936ae93b14d..7cf9cb3e3d98 100644 --- a/metricbeat/module/ceph/osd_tree/osd_tree.go +++ b/metricbeat/module/ceph/osd_tree/osd_tree.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" + "github.com/pkg/errors" ) const ( @@ -42,11 +43,13 @@ func init() { ) } +// MetricSet type defines all fields of the MetricSet type MetricSet struct { mb.BaseMetricSet *helper.HTTP } +// New creates a new instance of the docker memory MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { http, err := helper.NewHTTP(base) if err != nil { @@ -63,24 +66,20 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { content, err := m.HTTP.FetchContent() if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in fetch") } events, err := eventsMapping(content) if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in mapping") } for _, event := range events { reporter.Event(mb.Event{MetricSetFields: event}) } - return + return nil } diff --git a/metricbeat/module/ceph/osd_tree/osd_tree_integration_test.go b/metricbeat/module/ceph/osd_tree/osd_tree_integration_test.go index 8b2f4b56bb05..e53aa740d7e1 100644 --- a/metricbeat/module/ceph/osd_tree/osd_tree_integration_test.go +++ b/metricbeat/module/ceph/osd_tree/osd_tree_integration_test.go @@ -26,9 +26,9 @@ import ( ) func TestData(t *testing.T) { - f := mbtest.NewReportingMetricSetV2(t, getConfig()) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) - if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { + if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil { t.Fatal("write", err) } } diff --git a/metricbeat/module/ceph/osd_tree/osd_tree_test.go b/metricbeat/module/ceph/osd_tree/osd_tree_test.go index c8ce52fdf876..a92f40a8135e 100644 --- a/metricbeat/module/ceph/osd_tree/osd_tree_test.go +++ b/metricbeat/module/ceph/osd_tree/osd_tree_test.go @@ -49,8 +49,8 @@ func TestFetchEventContents(t *testing.T) { "hosts": []string{server.URL}, } - f := mbtest.NewReportingMetricSetV2(t, config) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, config) + events, errs := mbtest.ReportingFetchV2Error(f) if len(errs) > 0 { t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) } diff --git a/metricbeat/module/ceph/pool_disk/data.go b/metricbeat/module/ceph/pool_disk/data.go index 7dba27044c6f..a1b81aa270a5 100644 --- a/metricbeat/module/ceph/pool_disk/data.go +++ b/metricbeat/module/ceph/pool_disk/data.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/libbeat/logp" ) +//Stats represents the statistics for a pool type Stats struct { BytesUsed int64 `json:"bytes_used"` MaxAvail int64 `json:"max_avail"` @@ -31,16 +32,19 @@ type Stats struct { KbUsed int64 `json:"kb_used"` } +//Pool represents a given Ceph pool type Pool struct { - Id int64 `json:"id"` + ID int64 `json:"id"` Name string `json:"name"` Stats Stats `json:"stats"` } +//Output is a list of pools from the response type Output struct { Pools []Pool `json:"pools"` } +//DfRequest is the df response object type DfRequest struct { Status string `json:"status"` Output Output `json:"output"` @@ -58,7 +62,7 @@ func eventsMapping(content []byte) []common.MapStr { for _, Pool := range d.Output.Pools { event := common.MapStr{ "name": Pool.Name, - "id": Pool.Id, + "id": Pool.ID, "stats": common.MapStr{ "used": common.MapStr{ "bytes": Pool.Stats.BytesUsed, diff --git a/metricbeat/module/ceph/pool_disk/pool_disk.go b/metricbeat/module/ceph/pool_disk/pool_disk.go index 761c25887fac..48eab2d7e30a 100644 --- a/metricbeat/module/ceph/pool_disk/pool_disk.go +++ b/metricbeat/module/ceph/pool_disk/pool_disk.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" + "github.com/pkg/errors" ) const ( @@ -42,11 +43,13 @@ func init() { ) } +// MetricSet type defines all fields of the MetricSet type MetricSet struct { mb.BaseMetricSet *helper.HTTP } +// New creates a new instance of the docker memory MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { http, err := helper.NewHTTP(base) if err != nil { @@ -63,18 +66,19 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { content, err := m.HTTP.FetchContent() if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return errors.Wrap(err, "error in fetch") } events := eventsMapping(content) for _, event := range events { - reporter.Event(mb.Event{MetricSetFields: event}) + reported := reporter.Event(mb.Event{MetricSetFields: event}) + if !reported { + return nil + } } - return + return nil } diff --git a/metricbeat/module/ceph/pool_disk/pool_disk_integration_test.go b/metricbeat/module/ceph/pool_disk/pool_disk_integration_test.go index d4a0dd3a9bba..c2dc8b3f80cd 100644 --- a/metricbeat/module/ceph/pool_disk/pool_disk_integration_test.go +++ b/metricbeat/module/ceph/pool_disk/pool_disk_integration_test.go @@ -26,9 +26,9 @@ import ( ) func TestData(t *testing.T) { - f := mbtest.NewReportingMetricSetV2(t, getConfig()) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) - if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { + if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil { t.Fatal("write", err) } } diff --git a/metricbeat/module/ceph/pool_disk/pool_disk_test.go b/metricbeat/module/ceph/pool_disk/pool_disk_test.go index f5e8a3b9306f..f2367401f8f7 100644 --- a/metricbeat/module/ceph/pool_disk/pool_disk_test.go +++ b/metricbeat/module/ceph/pool_disk/pool_disk_test.go @@ -48,8 +48,8 @@ func TestFetchEventContents(t *testing.T) { "hosts": []string{server.URL}, } - f := mbtest.NewReportingMetricSetV2(t, config) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, config) + events, errs := mbtest.ReportingFetchV2Error(f) if len(errs) > 0 { t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) } From b18887a62787a81d5c1fe1c138ff51f0d3dd4cf0 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Tue, 30 Apr 2019 08:15:37 -0500 Subject: [PATCH 2/4] make fmt --- metricbeat/module/ceph/cluster_health/cluster_health.go | 3 ++- metricbeat/module/ceph/cluster_status/cluster_status.go | 3 ++- metricbeat/module/ceph/osd_df/osd_df.go | 3 ++- metricbeat/module/ceph/osd_tree/osd_tree.go | 3 ++- metricbeat/module/ceph/pool_disk/pool_disk.go | 3 ++- 5 files changed, 10 insertions(+), 5 deletions(-) diff --git a/metricbeat/module/ceph/cluster_health/cluster_health.go b/metricbeat/module/ceph/cluster_health/cluster_health.go index 8b38d73a4e72..24f74e1e4379 100644 --- a/metricbeat/module/ceph/cluster_health/cluster_health.go +++ b/metricbeat/module/ceph/cluster_health/cluster_health.go @@ -18,10 +18,11 @@ package cluster_health import ( + "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" - "github.com/pkg/errors" ) const ( diff --git a/metricbeat/module/ceph/cluster_status/cluster_status.go b/metricbeat/module/ceph/cluster_status/cluster_status.go index 76e759a611f0..86c9ecd4103d 100644 --- a/metricbeat/module/ceph/cluster_status/cluster_status.go +++ b/metricbeat/module/ceph/cluster_status/cluster_status.go @@ -18,10 +18,11 @@ package cluster_status import ( + "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" - "github.com/pkg/errors" ) const ( diff --git a/metricbeat/module/ceph/osd_df/osd_df.go b/metricbeat/module/ceph/osd_df/osd_df.go index b0de7fd0d8a8..c7dad6ccd32a 100644 --- a/metricbeat/module/ceph/osd_df/osd_df.go +++ b/metricbeat/module/ceph/osd_df/osd_df.go @@ -18,10 +18,11 @@ package osd_df import ( + "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" - "github.com/pkg/errors" ) const ( diff --git a/metricbeat/module/ceph/osd_tree/osd_tree.go b/metricbeat/module/ceph/osd_tree/osd_tree.go index 7cf9cb3e3d98..aa44f8b417d3 100644 --- a/metricbeat/module/ceph/osd_tree/osd_tree.go +++ b/metricbeat/module/ceph/osd_tree/osd_tree.go @@ -18,10 +18,11 @@ package osd_tree import ( + "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" - "github.com/pkg/errors" ) const ( diff --git a/metricbeat/module/ceph/pool_disk/pool_disk.go b/metricbeat/module/ceph/pool_disk/pool_disk.go index 48eab2d7e30a..9740a808d7cf 100644 --- a/metricbeat/module/ceph/pool_disk/pool_disk.go +++ b/metricbeat/module/ceph/pool_disk/pool_disk.go @@ -18,10 +18,11 @@ package pool_disk import ( + "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" - "github.com/pkg/errors" ) const ( From afde0c41478202ce0f16338e5b1bc6822cd349be Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Tue, 30 Apr 2019 09:47:30 -0500 Subject: [PATCH 3/4] fixed docs --- .../module/ceph/cluster_health/cluster_health.go | 2 +- metricbeat/module/ceph/cluster_health/data.go | 6 +++--- .../module/ceph/cluster_status/cluster_status.go | 2 +- metricbeat/module/ceph/cluster_status/data.go | 12 ++++++------ metricbeat/module/ceph/osd_df/data.go | 6 +++--- metricbeat/module/ceph/osd_df/osd_df.go | 2 +- metricbeat/module/ceph/osd_tree/data.go | 6 +++--- metricbeat/module/ceph/osd_tree/osd_tree.go | 2 +- metricbeat/module/ceph/pool_disk/data.go | 8 ++++---- metricbeat/module/ceph/pool_disk/pool_disk.go | 2 +- 10 files changed, 24 insertions(+), 24 deletions(-) diff --git a/metricbeat/module/ceph/cluster_health/cluster_health.go b/metricbeat/module/ceph/cluster_health/cluster_health.go index 24f74e1e4379..e651a7332d79 100644 --- a/metricbeat/module/ceph/cluster_health/cluster_health.go +++ b/metricbeat/module/ceph/cluster_health/cluster_health.go @@ -50,7 +50,7 @@ type MetricSet struct { *helper.HTTP } -// New creates a new instance of the docker memory MetricSet. +// New creates a new instance of the cluster_health MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { http, err := helper.NewHTTP(base) if err != nil { diff --git a/metricbeat/module/ceph/cluster_health/data.go b/metricbeat/module/ceph/cluster_health/data.go index 46b25efdacc7..cb827e0975fe 100644 --- a/metricbeat/module/ceph/cluster_health/data.go +++ b/metricbeat/module/ceph/cluster_health/data.go @@ -25,20 +25,20 @@ import ( "github.com/elastic/beats/libbeat/common" ) -//Timecheck contains part of the response from a HealthRequest +// Timecheck contains part of the response from a HealthRequest type Timecheck struct { RoundStatus string `json:"round_status"` Epoch int64 `json:"epoch"` Round int64 `json:"round"` } -//Output is the body of the status response +// Output is the body of the status response type Output struct { OverallStatus string `json:"overall_status"` Timechecks Timecheck `json:"timechecks"` } -//HealthRequest represents the response to a cluster health request +// HealthRequest represents the response to a cluster health request type HealthRequest struct { Status string `json:"status"` Output Output `json:"output"` diff --git a/metricbeat/module/ceph/cluster_status/cluster_status.go b/metricbeat/module/ceph/cluster_status/cluster_status.go index 86c9ecd4103d..b7a9eaa469b9 100644 --- a/metricbeat/module/ceph/cluster_status/cluster_status.go +++ b/metricbeat/module/ceph/cluster_status/cluster_status.go @@ -50,7 +50,7 @@ type MetricSet struct { *helper.HTTP } -// New creates a new instance of the docker memory MetricSet. +// New creates a new instance of the cluster_status MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { http, err := helper.NewHTTP(base) if err != nil { diff --git a/metricbeat/module/ceph/cluster_status/data.go b/metricbeat/module/ceph/cluster_status/data.go index 0368b965bd62..064026355fec 100644 --- a/metricbeat/module/ceph/cluster_status/data.go +++ b/metricbeat/module/ceph/cluster_status/data.go @@ -25,13 +25,13 @@ import ( "github.com/elastic/beats/libbeat/common" ) -//PgState represents placement group state +// PgState represents placement group state type PgState struct { Count int64 `json:"count"` StateName string `json:"state_name"` } -//Pgmap represents data from a placement group +// Pgmap represents data from a placement group type Pgmap struct { AvailByte int64 `json:"bytes_avail"` TotalByte int64 `json:"bytes_total"` @@ -56,7 +56,7 @@ type Pgmap struct { PgStates []PgState `json:"pgs_by_state"` } -//Osdmap represents data from an OSD +// Osdmap represents data from an OSD type Osdmap struct { Epoch int64 `json:"epoch"` Full bool `json:"full"` @@ -67,18 +67,18 @@ type Osdmap struct { RemapedPgs int64 `json:"num_remapped_pgs"` } -//Osdmap_ is a placeholder for the json parser +// Osdmap_ is a placeholder for the json parser type Osdmap_ struct { Osdmap Osdmap `json:"osdmap"` } -//Output is the response body +// Output is the response body type Output struct { Pgmap Pgmap `json:"pgmap"` Osdmap Osdmap_ `json:"osdmap"` } -//HealthRequest represents the response to a health request +// HealthRequest represents the response to a health request type HealthRequest struct { Status string `json:"status"` Output Output `json:"output"` diff --git a/metricbeat/module/ceph/osd_df/data.go b/metricbeat/module/ceph/osd_df/data.go index 08b08c39f257..fa0422afa519 100644 --- a/metricbeat/module/ceph/osd_df/data.go +++ b/metricbeat/module/ceph/osd_df/data.go @@ -25,7 +25,7 @@ import ( "github.com/elastic/beats/libbeat/common" ) -//Node represents a node object +// Node represents a node object type Node struct { ID int64 `json:"id"` Name string `json:"name"` @@ -36,12 +36,12 @@ type Node struct { DeviceClass string `json:"device_class"` } -//Output contains a node list from the df response +// Output contains a node list from the df response type Output struct { Nodes []Node `json:"nodes"` } -//OsdDfRequest contains the df response +// OsdDfRequest contains the df response type OsdDfRequest struct { Status string `json:"status"` Output Output `json:"output"` diff --git a/metricbeat/module/ceph/osd_df/osd_df.go b/metricbeat/module/ceph/osd_df/osd_df.go index c7dad6ccd32a..79139ed89fa1 100644 --- a/metricbeat/module/ceph/osd_df/osd_df.go +++ b/metricbeat/module/ceph/osd_df/osd_df.go @@ -50,7 +50,7 @@ type MetricSet struct { *helper.HTTP } -// New creates a new instance of the docker memory MetricSet. +// New creates a new instance of the osd_df MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { http, err := helper.NewHTTP(base) if err != nil { diff --git a/metricbeat/module/ceph/osd_tree/data.go b/metricbeat/module/ceph/osd_tree/data.go index 8a3b47963f7c..5f7d32056f0a 100644 --- a/metricbeat/module/ceph/osd_tree/data.go +++ b/metricbeat/module/ceph/osd_tree/data.go @@ -26,7 +26,7 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -//Node represents a node object +// Node represents a node object type Node struct { ID int64 `json:"id"` Name string `json:"name"` @@ -43,12 +43,12 @@ type Node struct { DeviceClass string `json:"device_class"` } -//Output contains a node list from the df response +// Output contains a node list from the df response type Output struct { Nodes []Node `json:"nodes"` } -//OsdTreeRequest is a OSD response object +// OsdTreeRequest is a OSD response object type OsdTreeRequest struct { Status string `json:"status"` Output Output `json:"output"` diff --git a/metricbeat/module/ceph/osd_tree/osd_tree.go b/metricbeat/module/ceph/osd_tree/osd_tree.go index aa44f8b417d3..0b3476c681f1 100644 --- a/metricbeat/module/ceph/osd_tree/osd_tree.go +++ b/metricbeat/module/ceph/osd_tree/osd_tree.go @@ -50,7 +50,7 @@ type MetricSet struct { *helper.HTTP } -// New creates a new instance of the docker memory MetricSet. +// New creates a new instance of the osd_tree MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { http, err := helper.NewHTTP(base) if err != nil { diff --git a/metricbeat/module/ceph/pool_disk/data.go b/metricbeat/module/ceph/pool_disk/data.go index a1b81aa270a5..d2bef377dc11 100644 --- a/metricbeat/module/ceph/pool_disk/data.go +++ b/metricbeat/module/ceph/pool_disk/data.go @@ -24,7 +24,7 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -//Stats represents the statistics for a pool +// Stats represents the statistics for a pool type Stats struct { BytesUsed int64 `json:"bytes_used"` MaxAvail int64 `json:"max_avail"` @@ -32,19 +32,19 @@ type Stats struct { KbUsed int64 `json:"kb_used"` } -//Pool represents a given Ceph pool +// Pool represents a given Ceph pool type Pool struct { ID int64 `json:"id"` Name string `json:"name"` Stats Stats `json:"stats"` } -//Output is a list of pools from the response +// Output is a list of pools from the response type Output struct { Pools []Pool `json:"pools"` } -//DfRequest is the df response object +// DfRequest is the df response object type DfRequest struct { Status string `json:"status"` Output Output `json:"output"` diff --git a/metricbeat/module/ceph/pool_disk/pool_disk.go b/metricbeat/module/ceph/pool_disk/pool_disk.go index 9740a808d7cf..dfcbf79c97a3 100644 --- a/metricbeat/module/ceph/pool_disk/pool_disk.go +++ b/metricbeat/module/ceph/pool_disk/pool_disk.go @@ -50,7 +50,7 @@ type MetricSet struct { *helper.HTTP } -// New creates a new instance of the docker memory MetricSet. +// New creates a new instance of the pool_disk MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { http, err := helper.NewHTTP(base) if err != nil { From f93970a932819a5aaa8ca93a6e01a70f65f67dfb Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Tue, 30 Apr 2019 22:33:30 -0500 Subject: [PATCH 4/4] return on closed chan --- metricbeat/module/ceph/osd_tree/osd_tree.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/metricbeat/module/ceph/osd_tree/osd_tree.go b/metricbeat/module/ceph/osd_tree/osd_tree.go index 0b3476c681f1..85192cff2920 100644 --- a/metricbeat/module/ceph/osd_tree/osd_tree.go +++ b/metricbeat/module/ceph/osd_tree/osd_tree.go @@ -79,7 +79,10 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { } for _, event := range events { - reporter.Event(mb.Event{MetricSetFields: event}) + reported := reporter.Event(mb.Event{MetricSetFields: event}) + if !reported { + return nil + } } return nil