Skip to content

Commit

Permalink
Introduce error reporting in schema (#3790)
Browse files Browse the repository at this point in the history
Currently in schema errors are only reported through logging which makes testing difficult. This PR introduces the collection of errors during applying the schema.

Now Apply returns a list of errors. Currently these are ignored to keep the same implementation as before. Over time this should be adjusted to handle errors correctly. Logging in the schema should be remove and moved up to the Metricsets itself.

This change should also simplify testing of different versions in unit tests, as `schema.Apply` will return errors. Currently this check is done in the system tests checking the log itself which is not very efficient.

ApplyNoError was introduced as a temporary workaround for the processors implementation.

This is part of #3807
  • Loading branch information
ruflin authored and exekias committed Mar 27, 2017
1 parent 3b4a945 commit 0288d2b
Show file tree
Hide file tree
Showing 28 changed files with 191 additions and 43 deletions.
4 changes: 2 additions & 2 deletions libbeat/processors/add_cloud_metadata/add_cloud_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ var (
"machine_type": c.Str("instanceType"),
"region": c.Str("region"),
"availability_zone": c.Str("availabilityZone"),
}.Apply
}.ApplyNoError

doSchema = s.Schema{
"instance_id": c.StrFromNum("droplet_id"),
"region": c.Str("region"),
}.Apply
}.ApplyNoError

gceHeaders = map[string]string{"Metadata-Flavor": "Google"}
gceSchema = func(m map[string]interface{}) common.MapStr {
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/haproxy/info/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,6 @@ func eventMapping(info *haproxy.Info) (common.MapStr, error) {

}

return schema.Apply(source), nil
data, _ := schema.Apply(source)
return data, nil
}
4 changes: 3 additions & 1 deletion metricbeat/module/haproxy/stat/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ func eventMapping(info []*haproxy.Stat) []common.MapStr {
source[typeOfT.Field(i).Name] = f.Interface()

}
events = append(events, schema.Apply(source))

data, _ := schema.Apply(source)
events = append(events, data)
}

return events
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/memcached/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ func (m *MetricSet) Fetch() (common.MapStr, error) {
}
}

event := schema.Apply(data)
event, _ := schema.Apply(data)
return event, nil
}
2 changes: 0 additions & 2 deletions metricbeat/module/mongodb/dbstats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,3 @@ var schema = s.Schema{
"size": c.Int("size"),
}, c.DictOptional),
}

var eventMapping = schema.Apply
3 changes: 2 additions & 1 deletion metricbeat/module/mongodb/dbstats/dbstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {
logp.Err("Failed to retrieve stats for db %s", dbName)
continue
}
events = append(events, eventMapping(result))
data, _ := schema.Apply(result)
events = append(events, data)
}

if len(events) == 0 {
Expand Down
2 changes: 0 additions & 2 deletions metricbeat/module/mongodb/status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,3 @@ var wiredTigerSchema = s.Schema{
"syncs": c.Int("log sync operations"),
}),
}

var eventMapping = schema.Apply
3 changes: 2 additions & 1 deletion metricbeat/module/mongodb/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,6 @@ func (m *MetricSet) Fetch() (common.MapStr, error) {
return nil, err
}

return eventMapping(result), nil
data, _ := schema.Apply(result)
return data, nil
}
3 changes: 2 additions & 1 deletion metricbeat/module/mysql/status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func eventMapping(status map[string]string) common.MapStr {
for key, val := range status {
source[key] = val
}
return schema.Apply(source)
data, _ := schema.Apply(source)
return data
}

func rawEventMapping(status map[string]string) common.MapStr {
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/php_fpm/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ func (m *MetricSet) Fetch() (common.MapStr, error) {
return nil, fmt.Errorf("error parsing json: %v", err)
}

return schema.Apply(stats), nil
data, _ := schema.Apply(stats)
return data, nil
}
3 changes: 2 additions & 1 deletion metricbeat/module/postgresql/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {

events := []common.MapStr{}
for _, result := range results {
events = append(events, eventMapping(result))
data, _ := schema.Apply(result)
events = append(events, data)
}

return events, nil
Expand Down
2 changes: 0 additions & 2 deletions metricbeat/module/postgresql/activity/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,3 @@ var schema = s.Schema{
"state": c.Str("state"),
"query": c.Str("query"),
}

var eventMapping = schema.Apply
3 changes: 2 additions & 1 deletion metricbeat/module/postgresql/bgwriter/bgwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ func (m *MetricSet) Fetch() (common.MapStr, error) {
return nil, fmt.Errorf("No results from the pg_stat_bgwriter query")
}

return schema.Apply(results[0]), nil
data, _ := schema.Apply(results[0])
return data, nil
}
2 changes: 0 additions & 2 deletions metricbeat/module/postgresql/database/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,3 @@ var schema = s.Schema{
"deadlocks": c.Int("deadlocks"),
"stats_reset": c.Time(time.RFC3339Nano, "stats_reset", s.Optional),
}

var eventMapping = schema.Apply
3 changes: 2 additions & 1 deletion metricbeat/module/postgresql/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {

events := []common.MapStr{}
for _, result := range results {
events = append(events, eventMapping(result))
data, _ := schema.Apply(result)
events = append(events, data)
}

return events, nil
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/prometheus/stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ var (
)

func eventMapping(entries map[string]interface{}) (common.MapStr, error) {
return schema.Apply(entries), nil
data, _ := schema.Apply(entries)
return data, nil
}
3 changes: 2 additions & 1 deletion metricbeat/module/redis/info/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,5 +152,6 @@ func eventMapping(info map[string]string) common.MapStr {
for key, val := range info {
source[key] = val
}
return schema.Apply(source)
data, _ := schema.Apply(source)
return data
}
3 changes: 2 additions & 1 deletion metricbeat/module/redis/keyspace/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func parseKeyspaceStats(keyspaceMap map[string]string) map[string]common.MapStr
db[stats[0]] = stats[1]
}
}
keyspace[k] = schema.Apply(db)
data, _ := schema.Apply(db)
keyspace[k] = data
}
}
return keyspace
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/zookeeper/mntr/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func eventMapping(response io.Reader) common.MapStr {
}
}

event := schema.Apply(fullEvent)
event, _ := schema.Apply(fullEvent)

// only exposed by the Leader
if _, ok := fullEvent["zk_followers"]; ok {
Expand Down
36 changes: 36 additions & 0 deletions metricbeat/schema/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package schema

import "fmt"

const (
RequiredType ErrorType = iota
OptionalType ErrorType = iota
)

type ErrorType int

type Error struct {
key string
message string
errorType ErrorType
}

func NewError(key string, message string) *Error {
return &Error{
key: key,
message: message,
errorType: RequiredType,
}
}

func (err *Error) SetType(errorType ErrorType) {
err.errorType = errorType
}

func (err *Error) IsType(errorType ErrorType) bool {
return err.errorType == errorType
}

func (err *Error) Error() string {
return fmt.Sprintf("Missing field: %s, Error: %s", err.key, err.message)
}
20 changes: 20 additions & 0 deletions metricbeat/schema/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package schema

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestIsError(t *testing.T) {
err := NewError("test", "Hello World")
assert.Error(t, err)
}

func TestType(t *testing.T) {
err := NewError("test", "Hello World")
assert.True(t, err.IsType(RequiredType))

err.SetType(OptionalType)
assert.True(t, err.IsType(OptionalType))
}
45 changes: 45 additions & 0 deletions metricbeat/schema/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package schema

type Errors []Error

func NewErrors() *Errors {
return &Errors{}
}

func (errs *Errors) AddError(err *Error) {
*errs = append(*errs, *err)
}

func (errs *Errors) AddErrors(errors *Errors) {
if errors == nil {
return
}
*errs = append(*errs, *errors...)
}

func (errs *Errors) HasRequiredErrors() bool {
for _, err := range *errs {
if err.IsType(RequiredType) {
return true
}
}
return false
}

func (errs *Errors) Error() string {
error := "Required fields are missing: "
for _, err := range *errs {
if err.IsType(RequiredType) {
error = error + "," + err.key
}
}
return error
}

func (errs *Errors) ErrorDebug() string {
error := "Fields are missing: "
for _, err := range *errs {
error = error + "," + err.key
}
return error
}
15 changes: 15 additions & 0 deletions metricbeat/schema/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package schema

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestErrors(t *testing.T) {
errs := NewErrors()
err := NewError("test", "Hello World")
errs.AddError(err)

assert.True(t, errs.HasRequiredErrors())
}
18 changes: 13 additions & 5 deletions metricbeat/schema/mapstriface/mapstriface.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ Note that this allows for converting, renaming, and restructuring the data.
package mapstriface

import (
"encoding/json"
"fmt"
"time"

"encoding/json"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/schema"
Expand All @@ -71,18 +70,27 @@ type ConvMap struct {
}

// Map drills down in the data dictionary by using the key
func (convMap ConvMap) Map(key string, event common.MapStr, data map[string]interface{}) {
func (convMap ConvMap) Map(key string, event common.MapStr, data map[string]interface{}) *schema.Errors {

subData, ok := data[convMap.Key].(map[string]interface{})
if !ok {
if !convMap.Optional {
err := schema.NewError(convMap.Key, "Error accessing sub-dictionary")
if convMap.Optional {
err.SetType(schema.OptionalType)
} else {
logp.Err("Error accessing sub-dictionary `%s`", convMap.Key)
}
return

errors := schema.NewErrors()
errors.AddError(err)

return errors
}

subEvent := common.MapStr{}
convMap.Schema.ApplyTo(subEvent, subData)
event[key] = subEvent
return nil
}

func (convMap ConvMap) HasKey(key string) bool {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/schema/mapstriface/mapstriface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ func TestConversions(t *testing.T) {
},
}

output := schema.Apply(input)
output, _ := schema.Apply(input)
assert.Equal(t, output, expected)
}
2 changes: 1 addition & 1 deletion metricbeat/schema/mapstrstr/mapstrstr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ func TestConversions(t *testing.T) {
},
}

output := schema.Apply(input)
output, _ := schema.Apply(input)
assert.Equal(t, output, expected)
}
Loading

0 comments on commit 0288d2b

Please sign in to comment.