Skip to content

Commit

Permalink
[Metricbeats] Migrate uwsgi/status to ReporterV2 (#11153)
Browse files Browse the repository at this point in the history
* upgrate uwsgi to ReporterV2
  • Loading branch information
fearful-symmetry authored Mar 8, 2019
1 parent 83dfb2f commit 0d10900
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 108 deletions.
72 changes: 25 additions & 47 deletions metricbeat/module/uwsgi/status/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,53 +1,31 @@
{
"@timestamp": "2011-11-11T00:00:00.000Z",
"@timestamp": "2017-10-12T08:05:34.853Z",
"event": {
"dataset": "uwsgi.status",
"duration": 115000,
"module": "uwsgi"
},
"metricset": {
"rtt": 3069,
"name": "stat",
"module": "uwsgi"
"name": "status"
},
"service": {
"address": "127.0.0.1:9192",
"type": "uwsgi"
},
"uwsgi": {
"status": {
"total": {
"requests": 102,
"exceptions": 1,
"write_errors": 0,
"read_errors": 0,
"pid": 1
},

"worker": {
"signals": 0,
"status": "idle",
"id": 1,
"pid": 7,
"accepting": 1,
"rss": 0,
"respawn_count": 1,
"tx": 5599,
"requests": 102,
"delta_requests": 102,
"running_time": 10103,
"exceptions": 1,
"avg_rt": 91,
"vsz": 0,
"signal_queue": 0,
"harakiri_count": 0
},

"core": {
"worker_pid": 7,
"requests": 75,
"static_requests": 0,
"routed_requests": 0,
"offloaded_requests": 0,
"write_errors": 0,
"read_errors": 0
"status": {
"core": {
"id": 1,
"read_errors": 0,
"requests": {
"offloaded": 0,
"routed": 0,
"static": 0,
"total": 941
},
"worker_pid": 7,
"write_errors": 0
}
}
}
},
"beat": {
"name": "beatname",
"hostname": "hostname",
"version": "7.0.0-alpha1"
}
}
}
18 changes: 9 additions & 9 deletions metricbeat/module/uwsgi/status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package status
import (
"encoding/json"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
)

type uwsgiCore struct {
Expand Down Expand Up @@ -75,15 +77,13 @@ type uwsgiStat struct {
// Sockets []UwsgiSocket `json:"sockets"`
}

func eventsMapping(content []byte) ([]common.MapStr, error) {
func eventsMapping(content []byte, reporter mb.ReporterV2) error {
var stats uwsgiStat
err := json.Unmarshal(content, &stats)
if err != nil {
logp.Err("uwsgi statistics parsing failed with error: %+v", err)
return nil, err
return errors.Wrap(err, "uwsgi statistics parsing failed")
}

events := []common.MapStr{}
totalRequests := 0
totalExceptions := 0
totalWriteErrors := 0
Expand Down Expand Up @@ -133,11 +133,11 @@ func eventsMapping(content []byte) ([]common.MapStr, error) {
"read_errors": core.ReadErrors,
},
}
events = append(events, coreEvent)
reporter.Event(mb.Event{MetricSetFields: coreEvent})
coreID++
}

events = append(events, workerEvent)
reporter.Event(mb.Event{MetricSetFields: workerEvent})
}

// overall
Expand All @@ -151,6 +151,6 @@ func eventsMapping(content []byte) ([]common.MapStr, error) {
},
}

events = append(events, baseEvent)
return events, nil
reporter.Event(mb.Event{MetricSetFields: baseEvent})
return nil
}
29 changes: 14 additions & 15 deletions metricbeat/module/uwsgi/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
package status

import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"

"github.com/elastic/beats/libbeat/logp"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
Expand Down Expand Up @@ -55,8 +55,8 @@ func fetchStatData(URL string) ([]byte, error) {

u, err := url.Parse(URL)
if err != nil {
logp.Err("parsing uwsgi stats url failed: %+v", err)
return nil, err

return nil, errors.Wrap(err, "parsing uwsgi stats url failed")
}

switch u.Scheme {
Expand All @@ -83,8 +83,8 @@ func fetchStatData(URL string) ([]byte, error) {
defer res.Body.Close()

if res.StatusCode != 200 {
logp.Err("failed to fetch uwsgi status with code: %d", res.StatusCode)
return nil, errors.New("http failed")

return nil, fmt.Errorf("failed to fetch uwsgi status with code: %d", res.StatusCode)
}
reader = res.Body
default:
Expand All @@ -93,23 +93,22 @@ func fetchStatData(URL string) ([]byte, error) {

data, err := ioutil.ReadAll(reader)
if err != nil {
logp.Err("uwsgi data read failed: %+v", err)
return nil, err
return nil, errors.Wrap(err, "uwsgi data read failed")
}

return data, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := fetchStatData(m.HostData().URI)
if err != nil {
return []common.MapStr{
common.MapStr{
"error": err.Error(),
},
}, err
reporter.Event(mb.Event{MetricSetFields: common.MapStr{
"error": err.Error(),
}},
)
return err
}
return eventsMapping(content)
return eventsMapping(content, reporter)
}
30 changes: 22 additions & 8 deletions metricbeat/module/uwsgi/status/status_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,37 @@ import (
func TestFetchTCP(t *testing.T) {
compose.EnsureUp(t, "uwsgi_tcp")

f := mbtest.NewEventsFetcher(t, getConfig("tcp"))
events, err := f.Fetch()
assert.NoError(t, err)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig("tcp"))
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

assert.True(t, len(events) > 0)
t.Log(events)
totals := findItems(events, "total")
assert.Equal(t, 1, len(totals))
}

func TestData(t *testing.T) {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig("http"))

if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}

func TestFetchHTTP(t *testing.T) {
compose.EnsureUp(t, "uwsgi_http")

f := mbtest.NewEventsFetcher(t, getConfig("http"))
events, err := f.Fetch()
assert.NoError(t, err)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig("http"))
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

assert.True(t, len(events) > 0)
t.Log(events)
totals := findItems(events, "total")
assert.Equal(t, 1, len(totals))
}
Expand Down
8 changes: 5 additions & 3 deletions metricbeat/module/uwsgi/status/status_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ func TestFetchDataUnixSock(t *testing.T) {
"hosts": []string{"unix://" + listener.Addr().String()},
}

f := mbtest.NewEventsFetcher(t, config)
events, err := f.Fetch()
assert.NoError(t, err)
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}

assertTestData(t, events)
wg.Wait()
Expand Down
57 changes: 31 additions & 26 deletions metricbeat/module/uwsgi/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
)

Expand All @@ -47,26 +48,26 @@ func testData(t *testing.T) (data []byte) {
return
}

func findItems(mp []common.MapStr, key string) []common.MapStr {
func findItems(mp []mb.Event, key string) []common.MapStr {
result := make([]common.MapStr, 0, 1)
for _, v := range mp {
if el, ok := v[key]; ok {
if el, ok := v.MetricSetFields[key]; ok {
result = append(result, el.(common.MapStr))
}
}

return result
}

func assertTestData(t *testing.T, mp []common.MapStr) {
totals := findItems(mp, "total")
func assertTestData(t *testing.T, evt []mb.Event) {
totals := findItems(evt, "total")
assert.Equal(t, 1, len(totals))
assert.Equal(t, 2042, totals[0]["requests"])
assert.Equal(t, 0, totals[0]["exceptions"])
assert.Equal(t, 34, totals[0]["write_errors"])
assert.Equal(t, 38, totals[0]["read_errors"])

workers := findItems(mp, "core")
workers := findItems(evt, "core")
assert.Equal(t, 4, len(workers))
}

Expand All @@ -93,9 +94,11 @@ func TestFetchDataTCP(t *testing.T) {
"hosts": []string{"tcp://" + listener.Addr().String()},
}

f := mbtest.NewEventsFetcher(t, config)
events, err := f.Fetch()
assert.NoError(t, err)
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}

assertTestData(t, events)
wg.Wait()
Expand All @@ -117,9 +120,11 @@ func TestFetchDataHTTP(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewEventsFetcher(t, config)
events, err := f.Fetch()
assert.NoError(t, err)
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}

assertTestData(t, events)
}
Expand All @@ -138,9 +143,9 @@ func TestFetchDataUnmarshalledError(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewEventsFetcher(t, config)
_, err := f.Fetch()
assert.Error(t, err)
f := mbtest.NewReportingMetricSetV2Error(t, config)
_, errs := mbtest.ReportingFetchV2Error(f)
assert.NotEmpty(t, errs)
}

func TestFetchDataSourceDown(t *testing.T) {
Expand All @@ -155,9 +160,9 @@ func TestFetchDataSourceDown(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewEventsFetcher(t, config)
_, err := f.Fetch()
assert.Error(t, err)
f := mbtest.NewReportingMetricSetV2Error(t, config)
_, errs := mbtest.ReportingFetchV2Error(f)
assert.NotEmpty(t, errs)
}

func TestConfigError(t *testing.T) {
Expand All @@ -167,27 +172,27 @@ func TestConfigError(t *testing.T) {
"hosts": []string{"unix://127.0.0.1:8080"},
}

f := mbtest.NewEventsFetcher(t, config)
_, err := f.Fetch()
assert.Error(t, err)
f := mbtest.NewReportingMetricSetV2Error(t, config)
_, errs := mbtest.ReportingFetchV2Error(f)
assert.NotEmpty(t, errs)

config = map[string]interface{}{
"module": "uwsgi",
"metricsets": []string{"status"},
"hosts": []string{"unknown_url_format"},
}

f = mbtest.NewEventsFetcher(t, config)
_, err = f.Fetch()
assert.Error(t, err)
f = mbtest.NewReportingMetricSetV2Error(t, config)
_, errs = mbtest.ReportingFetchV2Error(f)
assert.NotEmpty(t, errs)

config = map[string]interface{}{
"module": "uwsgi",
"metricsets": []string{"status"},
"hosts": []string{"ftp://127.0.0.1:8080"},
}

f = mbtest.NewEventsFetcher(t, config)
_, err = f.Fetch()
assert.Error(t, err)
f = mbtest.NewReportingMetricSetV2Error(t, config)
_, errs = mbtest.ReportingFetchV2Error(f)
assert.NotEmpty(t, errs)
}

0 comments on commit 0d10900

Please sign in to comment.