From 7bd35f4b1e3e377bb1be2b4a673e3913933f480e Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Fri, 15 Jul 2016 14:05:40 +0200 Subject: [PATCH] Don't include empty values in Metricbeat (#2034) This is an alternative implementation of #2032. Instead of collecting the errors and then removing the empty values, we define a schema of conversions and we use code apply them, so we have the opportunity to handle errors. Fixes #1972. --- CHANGELOG.asciidoc | 2 + metricbeat/helper/conversion.go | 139 ++++++++++++---- metricbeat/module/apache/status/data.go | 72 ++++---- .../apache/status/status_integration_test.go | 4 +- metricbeat/module/mysql/status/data.go | 53 +++--- metricbeat/module/redis/info/data.go | 156 +++++++++--------- metricbeat/module/redis/keyspace/data.go | 12 +- metricbeat/module/zookeeper/mntr/data.go | 56 ++++--- 8 files changed, 298 insertions(+), 196 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 13b1def970d..fadc0a3df4d 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -37,6 +37,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha4...master[Check the HEAD d *Metricbeat* +- Do not send zero values when no value was present in the source. {issue}1972[1972] + *Packetbeat* *Topbeat* diff --git a/metricbeat/helper/conversion.go b/metricbeat/helper/conversion.go index bf17a3c9769..ca4886229c1 100644 --- a/metricbeat/helper/conversion.go +++ b/metricbeat/helper/conversion.go @@ -1,81 +1,145 @@ -/* -The conversion functions take a key and a map[string]string. First it checks if the key exists and logs and error -if this is not the case. Second the conversion to the type is done. In case of an error and error is logged and the -default values is returned. This guarantees that also if a field is missing or is not defined, still the full metricset -is returned. -*/ package helper import ( + "fmt" "strconv" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) +// Schema describes how a map[string]string object can be parsed and converted into +// an event. The conversions can be described using an (optionally nested) common.MapStr +// that contains Conv objects. +type Schema struct { + conversions common.MapStr +} + +// A Conv object represents a conversion mechanism from the data map to the event map. +type Conv struct { + Func Convertor // Convertor function + Key string // The key in the data map + Optional bool // Whether to log errors if the key is not found +} + +// Convertor function type +type Convertor func(key string, data map[string]string) (interface{}, error) + +// NewSchema creates a new converting schema. +func NewSchema(conversions common.MapStr) Schema { + return Schema{conversions} +} + +// ApplyTo adds the fields extracted from data, converted using the schema, to the +// event map. +func (s Schema) ApplyTo(event common.MapStr, data map[string]string) common.MapStr { + applySchemaToEvent(event, data, s.conversions) + return event +} + +// Apply converts the fields extracted from data, using the schema, into a new map. +func (s Schema) Apply(data map[string]string) common.MapStr { + return s.ApplyTo(common.MapStr{}, data) +} + +func applySchemaToEvent(event common.MapStr, data map[string]string, conversions common.MapStr) { + for key, conversion := range conversions { + switch conversion.(type) { + case Conv: + conv := conversion.(Conv) + value, err := conv.Func(conv.Key, data) + if err != nil { + if !conv.Optional { + logp.Err("Error on field '%s': %v", key, err) + } + } else { + event[key] = value + } + case common.MapStr: + subEvent := common.MapStr{} + applySchemaToEvent(subEvent, data, conversion.(common.MapStr)) + event[key] = subEvent + } + } +} + // ToBool converts value to bool. In case of error, returns false -func ToBool(key string, data map[string]string) bool { +func ToBool(key string, data map[string]string) (interface{}, error) { exists := checkExist(key, data) if !exists { - logp.Err("Key does not exist in in data: %s", key) - return false + return false, fmt.Errorf("Key `%s` not found", key) } value, err := strconv.ParseBool(data[key]) if err != nil { - logp.Err("Error converting param to bool: %s", key) - return false + return false, fmt.Errorf("Error converting param to bool: %s", key) } - return value + return value, nil +} + +// Bool creates a Conv object for parsing booleans +func Bool(key string, opts ...SchemaOption) Conv { + return setOptions(Conv{Key: key, Func: ToBool}, opts) } // ToFloat converts value to float64. In case of error, returns 0.0 -func ToFloat(key string, data map[string]string) float64 { +func ToFloat(key string, data map[string]string) (interface{}, error) { exists := checkExist(key, data) if !exists { - logp.Err("Key does not exist in in data: %s", key) - return 0.0 + return false, fmt.Errorf("Key `%s` not found", key) } value, err := strconv.ParseFloat(data[key], 64) if err != nil { - logp.Err("Error converting param to float: %s", key) - value = 0.0 + return 0.0, fmt.Errorf("Error converting param to float: %s", key) } - return value + return value, nil +} + +// Float creates a Conv object for parsing floats +func Float(key string, opts ...SchemaOption) Conv { + return setOptions(Conv{Key: key, Func: ToFloat}, opts) } // ToInt converts value to int. In case of error, returns 0 -func ToInt(key string, data map[string]string) int64 { +func ToInt(key string, data map[string]string) (interface{}, error) { exists := checkExist(key, data) if !exists { - logp.Err("Key does not exist in in data: %s", key) - return 0 + return false, fmt.Errorf("Key `%s` not found", key) } value, err := strconv.ParseInt(data[key], 10, 64) if err != nil { - logp.Err("Error converting param to int: %s", key) - return 0 + return 0, fmt.Errorf("Error converting param to int: %s", key) } - return value + return value, nil +} + +// Int creates a Conv object for parsing integers +func Int(key string, opts ...SchemaOption) Conv { + return setOptions(Conv{Key: key, Func: ToInt}, opts) } // ToStr converts value to str. In case of error, returns "" -func ToStr(key string, data map[string]string) string { +func ToStr(key string, data map[string]string) (interface{}, error) { exists := checkExist(key, data) if !exists { - logp.Err("Key does not exist in in data: %s", key) - return "" + return false, fmt.Errorf("Key `%s` not found", key) } - return data[key] + return data[key], nil +} + +// Str creates a Conv object for parsing strings +func Str(key string, opts ...SchemaOption) Conv { + return setOptions(Conv{Key: key, Func: ToStr}, opts) } // checkExists checks if a key exists in the given data set @@ -83,3 +147,22 @@ func checkExist(key string, data map[string]string) bool { _, ok := data[key] return ok } + +// SchemaOption is for adding optional parameters to the conversion +// functions +type SchemaOption func(c Conv) Conv + +// The optional flag suppresses the error message in case the key +// doesn't exist or results in an error. +func Optional(c Conv) Conv { + c.Optional = true + return c +} + +// setOptions adds the optional flags to the Conv object +func setOptions(c Conv, opts []SchemaOption) Conv { + for _, opt := range opts { + c = opt(c) + } + return c +} diff --git a/metricbeat/module/apache/status/data.go b/metricbeat/module/apache/status/data.go index eed902dfece..5c183a1e637 100644 --- a/metricbeat/module/apache/status/data.go +++ b/metricbeat/module/apache/status/data.go @@ -15,6 +15,42 @@ var ( // This should match: "CPUSystem: .01" matchNumber = regexp.MustCompile("(^[0-9a-zA-Z ]+):\\s+(\\d*\\.?\\d+)") + + schema = h.NewSchema(common.MapStr{ + "total_accesses": h.Int("Total Accesses"), + "total_kbytes": h.Int("Total kBytes"), + "requests_per_sec": h.Float("ReqPerSec", h.Optional), + "bytes_per_sec": h.Float("BytesPerSec", h.Optional), + "bytes_per_request": h.Float("BytesPerReq", h.Optional), + "workers": common.MapStr{ + "busy": h.Int("BusyWorkers"), + "idle": h.Int("IdleWorkers"), + }, + "uptime": common.MapStr{ + "server_uptime": h.Int("ServerUptimeSeconds"), + "uptime": h.Int("Uptime"), + }, + "cpu": common.MapStr{ + "load": h.Float("CPULoad", h.Optional), + "user": h.Float("CPUUser"), + "system": h.Float("CPUSystem"), + "children_user": h.Float("CPUChildrenUser"), + "children_system": h.Float("CPUChildrenSystem"), + }, + "connections": common.MapStr{ + "total": h.Int("ConnsTotal"), + "async": common.MapStr{ + "writing": h.Int("ConnsAsyncWriting"), + "keep_alive": h.Int("ConnsAsyncKeepAlive"), + "closing": h.Int("ConnsAsyncClosing"), + }, + }, + "load": common.MapStr{ + "1": h.Float("Load1"), + "5": h.Float("Load5"), + "15": h.Float("Load15"), + }, + }) ) // Map body to MapStr @@ -89,40 +125,7 @@ func eventMapping(body io.ReadCloser, hostname string) common.MapStr { } event := common.MapStr{ - "hostname": hostname, - "total_accesses": h.ToInt("Total Accesses", fullEvent), - "total_kbytes": h.ToInt("Total kBytes", fullEvent), - "requests_per_sec": h.ToFloat("ReqPerSec", fullEvent), - "bytes_per_sec": h.ToFloat("BytesPerSec", fullEvent), - "bytes_per_request": h.ToFloat("BytesPerReq", fullEvent), - "workers": common.MapStr{ - "busy": h.ToInt("BusyWorkers", fullEvent), - "idle": h.ToInt("IdleWorkers", fullEvent), - }, - "uptime": common.MapStr{ - "server_uptime": h.ToInt("ServerUptimeSeconds", fullEvent), - "uptime": h.ToInt("Uptime", fullEvent), - }, - "cpu": common.MapStr{ - "load": h.ToFloat("CPULoad", fullEvent), - "user": h.ToFloat("CPUUser", fullEvent), - "system": h.ToFloat("CPUSystem", fullEvent), - "children_user": h.ToFloat("CPUChildrenUser", fullEvent), - "children_system": h.ToFloat("CPUChildrenSystem", fullEvent), - }, - "connections": common.MapStr{ - "total": h.ToInt("ConnsTotal", fullEvent), - "async": common.MapStr{ - "writing": h.ToInt("ConnsAsyncWriting", fullEvent), - "keep_alive": h.ToInt("ConnsAsyncKeepAlive", fullEvent), - "closing": h.ToInt("ConnsAsyncClosing", fullEvent), - }, - }, - "load": common.MapStr{ - "1": h.ToFloat("Load1", fullEvent), - "5": h.ToFloat("Load5", fullEvent), - "15": h.ToFloat("Load15", fullEvent), - }, + "hostname": hostname, "scoreboard": common.MapStr{ "starting_up": totalS, "reading_request": totalR, @@ -138,6 +141,7 @@ func eventMapping(body io.ReadCloser, hostname string) common.MapStr { "total": totalAll, }, } + schema.ApplyTo(event, fullEvent) return event } diff --git a/metricbeat/module/apache/status/status_integration_test.go b/metricbeat/module/apache/status/status_integration_test.go index 119c54e9072..e3a4a807c06 100644 --- a/metricbeat/module/apache/status/status_integration_test.go +++ b/metricbeat/module/apache/status/status_integration_test.go @@ -21,7 +21,9 @@ func TestFetch(t *testing.T) { t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) // Check number of fields. - assert.Equal(t, 12, len(event)) + if len(event) < 11 { + t.Fatal("Too few top-level elements in the event") + } } func TestData(t *testing.T) { diff --git a/metricbeat/module/mysql/status/data.go b/metricbeat/module/mysql/status/data.go index 8db4acc13cf..013583674e8 100644 --- a/metricbeat/module/mysql/status/data.go +++ b/metricbeat/module/mysql/status/data.go @@ -5,47 +5,48 @@ import ( h "github.com/elastic/beats/metricbeat/helper" ) -// Map data to MapStr of server stats variables: http://dev.mysql.com/doc/refman/5.7/en/server-status-variables.html -// This is only a subset of the available values -func eventMapping(status map[string]string) common.MapStr { - - event := common.MapStr{ +var ( + schema = h.NewSchema(common.MapStr{ "aborted": common.MapStr{ - "clients": h.ToInt("Aborted_clients", status), - "connects": h.ToInt("Aborted_connects", status), + "clients": h.Int("Aborted_clients"), + "connects": h.Int("Aborted_connects"), }, "binlog": common.MapStr{ "cache": common.MapStr{ - "disk_use": h.ToInt("Binlog_cache_disk_use", status), - "use": h.ToInt("Binlog_cache_use", status), + "disk_use": h.Int("Binlog_cache_disk_use"), + "use": h.Int("Binlog_cache_use"), }, }, "bytes": common.MapStr{ - "received": h.ToInt("Bytes_received", status), - "sent": h.ToInt("Bytes_sent", status), + "received": h.Int("Bytes_received"), + "sent": h.Int("Bytes_sent"), }, - "connections": h.ToInt("Connections", status), + "connections": h.Int("Connections"), "created": common.MapStr{ "tmp": common.MapStr{ - "disk_tables": h.ToInt("Created_tmp_disk_tables", status), - "files": h.ToInt("Created_tmp_files", status), - "tables": h.ToInt("Created_tmp_tables", status), + "disk_tables": h.Int("Created_tmp_disk_tables"), + "files": h.Int("Created_tmp_files"), + "tables": h.Int("Created_tmp_tables"), }, }, "delayed": common.MapStr{ - "errors": h.ToInt("Delayed_errors", status), - "insert_threads": h.ToInt("Delayed_insert_threads", status), - "writes": h.ToInt("Delayed_writes", status), + "errors": h.Int("Delayed_errors"), + "insert_threads": h.Int("Delayed_insert_threads"), + "writes": h.Int("Delayed_writes"), }, - "flush_commands": h.ToInt("Flush_commands", status), - "max_used_connections": h.ToInt("Max_used_connections", status), + "flush_commands": h.Int("Flush_commands"), + "max_used_connections": h.Int("Max_used_connections"), "open": common.MapStr{ - "files": h.ToInt("Open_files", status), - "streams": h.ToInt("Open_streams", status), - "tables": h.ToInt("Open_tables", status), + "files": h.Int("Open_files"), + "streams": h.Int("Open_streams"), + "tables": h.Int("Open_tables"), }, - "opened_tables": h.ToInt("Opened_tables", status), - } + "opened_tables": h.Int("Opened_tables"), + }) +) - return event +// Map data to MapStr of server stats variables: http://dev.mysql.com/doc/refman/5.7/en/server-status-variables.html +// This is only a subset of the available values +func eventMapping(status map[string]string) common.MapStr { + return schema.Apply(status) } diff --git a/metricbeat/module/redis/info/data.go b/metricbeat/module/redis/info/data.go index c91af599bd8..ec20443b6a5 100644 --- a/metricbeat/module/redis/info/data.go +++ b/metricbeat/module/redis/info/data.go @@ -5,116 +5,118 @@ import ( h "github.com/elastic/beats/metricbeat/helper" ) -// Map data to MapStr -func eventMapping(info map[string]string) common.MapStr { - - // Full mapping from info - event := common.MapStr{ +var ( + schema = h.NewSchema(common.MapStr{ "clients": common.MapStr{ - "connected": h.ToInt("connected_clients", info), - "longest_output_list": h.ToInt("client_longest_output_list", info), - "biggest_input_buf": h.ToInt("client_biggest_input_buf", info), - "blocked": h.ToInt("blocked_clients", info), + "connected": h.Int("connected_clients"), + "longest_output_list": h.Int("client_longest_output_list"), + "biggest_input_buf": h.Int("client_biggest_input_buf"), + "blocked": h.Int("blocked_clients"), }, "cluster": common.MapStr{ - "enabled": h.ToBool("cluster_enabled", info), + "enabled": h.Bool("cluster_enabled"), }, "cpu": common.MapStr{ "used": common.MapStr{ - "sys": h.ToFloat("used_cpu_sys", info), - "user": h.ToFloat("used_cpu_user", info), - "sys_children": h.ToFloat("used_cpu_sys_children", info), - "user_children": h.ToFloat("used_cpu_user_children", info), + "sys": h.Float("used_cpu_sys"), + "user": h.Float("used_cpu_user"), + "sys_children": h.Float("used_cpu_sys_children"), + "user_children": h.Float("used_cpu_user_children"), }, }, "memory": common.MapStr{ "used": common.MapStr{ - "value": h.ToInt("used_memory", info), // As it is a top key, this goes into value - "rss": h.ToInt("used_memory_rss", info), - "peak": h.ToInt("used_memory_peak", info), - "lua": h.ToInt("used_memory_lua", info), + "value": h.Int("used_memory"), // As it is a top key, this goes into value + "rss": h.Int("used_memory_rss"), + "peak": h.Int("used_memory_peak"), + "lua": h.Int("used_memory_lua"), }, - "allocator": h.ToStr("mem_allocator", info), // Could be moved to server as it rarely changes + "allocator": h.Str("mem_allocator"), // Could be moved to server as it rarely changes }, "persistence": common.MapStr{ - "loading": h.ToBool("loading", info), + "loading": h.Bool("loading"), "rdb": common.MapStr{ - "changes_since_last_save": h.ToInt("rdb_changes_since_last_save", info), - "bgsave_in_progress": h.ToBool("rdb_bgsave_in_progress", info), - "last_save_time": h.ToInt("rdb_last_save_time", info), - "last_bgsave_status": h.ToStr("rdb_last_bgsave_status", info), - "last_bgsave_time_sec": h.ToInt("rdb_last_bgsave_time_sec", info), - "current_bgsave_time_sec": h.ToInt("rdb_current_bgsave_time_sec", info), + "changes_since_last_save": h.Int("rdb_changes_since_last_save"), + "bgsave_in_progress": h.Bool("rdb_bgsave_in_progress"), + "last_save_time": h.Int("rdb_last_save_time"), + "last_bgsave_status": h.Str("rdb_last_bgsave_status"), + "last_bgsave_time_sec": h.Int("rdb_last_bgsave_time_sec"), + "current_bgsave_time_sec": h.Int("rdb_current_bgsave_time_sec"), }, "used": common.MapStr{ - "enabled": h.ToBool("aof_enabled", info), - "rewrite_in_progress": h.ToBool("aof_rewrite_in_progress", info), - "rewrite_scheduled": h.ToBool("aof_rewrite_scheduled", info), - "last_rewrite_time_sec": h.ToInt("aof_last_rewrite_time_sec", info), - "current_rewrite_time_sec": h.ToInt("aof_current_rewrite_time_sec", info), - "last_bgrewrite_status": h.ToStr("aof_last_bgrewrite_status", info), - "last_write_status": h.ToStr("aof_last_write_status", info), + "enabled": h.Bool("aof_enabled"), + "rewrite_in_progress": h.Bool("aof_rewrite_in_progress"), + "rewrite_scheduled": h.Bool("aof_rewrite_scheduled"), + "last_rewrite_time_sec": h.Int("aof_last_rewrite_time_sec"), + "current_rewrite_time_sec": h.Int("aof_current_rewrite_time_sec"), + "last_bgrewrite_status": h.Str("aof_last_bgrewrite_status"), + "last_write_status": h.Str("aof_last_write_status"), }, }, "replication": common.MapStr{ - "role": h.ToStr("role", info), - "connected_slaves": h.ToInt("connected_slaves", info), - "master_offset": h.ToInt("master_repl_offset", info), + "role": h.Str("role"), + "connected_slaves": h.Int("connected_slaves"), + "master_offset": h.Int("master_repl_offset"), "backlog": common.MapStr{ - "active": h.ToInt("repl_backlog_active", info), - "size": h.ToInt("repl_backlog_size", info), - "first_byte_offset": h.ToInt("repl_backlog_first_byte_offset", info), - "histlen": h.ToInt("repl_backlog_histlen", info), + "active": h.Int("repl_backlog_active"), + "size": h.Int("repl_backlog_size"), + "first_byte_offset": h.Int("repl_backlog_first_byte_offset"), + "histlen": h.Int("repl_backlog_histlen"), }, }, "server": common.MapStr{ - "version": h.ToStr("redis_version", info), - "git_sha1": h.ToStr("redis_git_sha1", info), - "git_dirty": h.ToStr("redis_git_dirty", info), - "build_id": h.ToStr("redis_build_id", info), - "mode": h.ToStr("redis_mode", info), - "os": h.ToStr("os", info), - "arch_bits": h.ToStr("arch_bits", info), - "multiplexing_api": h.ToStr("multiplexing_api", info), - "gcc_version": h.ToStr("gcc_version", info), - "process_id": h.ToInt("process_id", info), - "run_id": h.ToStr("run_id", info), - "tcp_port": h.ToInt("tcp_port", info), - "uptime": h.ToInt("uptime_in_seconds", info), // Uptime days was removed as duplicate - "hz": h.ToInt("hz", info), - "lru_clock": h.ToInt("lru_clock", info), - "config_file": h.ToStr("config_file", info), + "version": h.Str("redis_version"), + "git_sha1": h.Str("redis_git_sha1"), + "git_dirty": h.Str("redis_git_dirty"), + "build_id": h.Str("redis_build_id"), + "mode": h.Str("redis_mode"), + "os": h.Str("os"), + "arch_bits": h.Str("arch_bits"), + "multiplexing_api": h.Str("multiplexing_api"), + "gcc_version": h.Str("gcc_version"), + "process_id": h.Int("process_id"), + "run_id": h.Str("run_id"), + "tcp_port": h.Int("tcp_port"), + "uptime": h.Int("uptime_in_seconds"), // Uptime days was removed as duplicate + "hz": h.Int("hz"), + "lru_clock": h.Int("lru_clock"), + "config_file": h.Str("config_file"), }, "stats": common.MapStr{ "connections": common.MapStr{ - "received": h.ToInt("total_connections_received", info), - "rejected": h.ToInt("rejected_connections", info), + "received": h.Int("total_connections_received"), + "rejected": h.Int("rejected_connections"), }, - "total_commands_processed": h.ToInt("total_commands_processed", info), - "total_net_input_bytes": h.ToInt("total_net_input_bytes", info), - "total_net_output_bytes": h.ToInt("total_net_output_bytes", info), - "instantaneous_ops_per_sec": h.ToInt("instantaneous_ops_per_sec", info), - "instantaneous_input_kbps": h.ToFloat("instantaneous_input_kbps", info), - "instantaneous_output_kbps": h.ToFloat("instantaneous_output_kbps", info), + "total_commands_processed": h.Int("total_commands_processed"), + "total_net_input_bytes": h.Int("total_net_input_bytes"), + "total_net_output_bytes": h.Int("total_net_output_bytes"), + "instantaneous_ops_per_sec": h.Int("instantaneous_ops_per_sec"), + "instantaneous_input_kbps": h.Float("instantaneous_input_kbps"), + "instantaneous_output_kbps": h.Float("instantaneous_output_kbps"), "sync": common.MapStr{ - "full": h.ToInt("sync_full", info), - "partial_ok": h.ToInt("sync_partial_ok", info), - "partial_err": h.ToInt("sync_partial_err", info), + "full": h.Int("sync_full"), + "partial_ok": h.Int("sync_partial_ok"), + "partial_err": h.Int("sync_partial_err"), }, "keys": common.MapStr{ - "expired": h.ToInt("expired_keys", info), - "evicted": h.ToInt("evicted_keys", info), + "expired": h.Int("expired_keys"), + "evicted": h.Int("evicted_keys"), }, "keyspace": common.MapStr{ - "hits": h.ToInt("keyspace_hits", info), - "misses": h.ToInt("keyspace_misses", info), + "hits": h.Int("keyspace_hits"), + "misses": h.Int("keyspace_misses"), }, - "pubsub_channels": h.ToInt("pubsub_channels", info), - "pubsub_patterns": h.ToInt("pubsub_patterns", info), - "latest_fork_usec": h.ToInt("latest_fork_usec", info), - "migrate_cached_sockets": h.ToInt("migrate_cached_sockets", info), + "pubsub_channels": h.Int("pubsub_channels"), + "pubsub_patterns": h.Int("pubsub_patterns"), + "latest_fork_usec": h.Int("latest_fork_usec"), + "migrate_cached_sockets": h.Int("migrate_cached_sockets"), }, - } + }) +) - return event +// Map data to MapStr +func eventMapping(info map[string]string) common.MapStr { + + // Full mapping from info + return schema.Apply(info) } diff --git a/metricbeat/module/redis/keyspace/data.go b/metricbeat/module/redis/keyspace/data.go index 84daf018dbf..da5b66d0040 100644 --- a/metricbeat/module/redis/keyspace/data.go +++ b/metricbeat/module/redis/keyspace/data.go @@ -37,6 +37,12 @@ func findKeyspaceStats(info map[string]string) map[string]string { return keyspace } +var schema = h.NewSchema(common.MapStr{ + "keys": h.Int("keys"), + "expires": h.Int("expires"), + "avg_ttl": h.Int("avg_ttl"), +}) + // parseKeyspaceStats resolves the overloaded value string that Redis returns for keyspace func parseKeyspaceStats(keyspaceMap map[string]string) map[string]common.MapStr { keyspace := map[string]common.MapStr{} @@ -55,11 +61,7 @@ func parseKeyspaceStats(keyspaceMap map[string]string) map[string]common.MapStr db[stats[0]] = stats[1] } } - keyspace[k] = common.MapStr{ - "keys": h.ToInt("keys", db), - "expires": h.ToInt("expires", db), - "avg_ttl": h.ToInt("avg_ttl", db), - } + keyspace[k] = schema.Apply(db) } } return keyspace diff --git a/metricbeat/module/zookeeper/mntr/data.go b/metricbeat/module/zookeeper/mntr/data.go index 696e169d134..2a8ae7c7764 100644 --- a/metricbeat/module/zookeeper/mntr/data.go +++ b/metricbeat/module/zookeeper/mntr/data.go @@ -13,6 +13,34 @@ import ( var ( // Matches first the variable name, second the param itself paramMatcher = regexp.MustCompile("([^\\s]+)\\s+(.*$)") + schema = h.NewSchema(common.MapStr{ + "version": h.Str("zk_version"), + "latency": common.MapStr{ + "avg": h.Int("zk_avg_latency"), + "min": h.Int("zk_min_latency"), + "max": h.Int("zk_max_latency"), + }, + "packets": common.MapStr{ + "received": h.Int("zk_packets_received"), + "sent": h.Int("zk_packets_sent"), + }, + "num_alive_connections": h.Int("zk_num_alive_connections"), + "outstanding_requests": h.Int("zk_outstanding_requests"), + "server_state": h.Str("zk_server_state"), + "znode_count": h.Int("zk_znode_count"), + "watch_count": h.Int("zk_watch_count"), + "ephemerals_count": h.Int("zk_ephemerals_count"), + "approximate_data_size": h.Int("zk_approximate_data_size"), + }) + schemaLeader = h.NewSchema(common.MapStr{ + "followers": h.Int("zk_followers"), + "synced_followers": h.Int("zk_synced_followers"), + "pending_syncs": h.Int("zk_pending_syncs"), + }) + schemaUnix = h.NewSchema(common.MapStr{ + "open_file_descriptor_count": h.Int("zk_open_file_descriptor_count"), + "max_file_descriptor_count": h.Int("zk_max_file_descriptor_count"), + }) ) func eventMapping(response io.Reader) common.MapStr { @@ -28,38 +56,16 @@ func eventMapping(response io.Reader) common.MapStr { } } - // Manually convert and select fields which are used - event := common.MapStr{ - "version": h.ToStr("zk_version", fullEvent), - "latency": common.MapStr{ - "avg": h.ToInt("zk_avg_latency", fullEvent), - "min": h.ToInt("zk_min_latency", fullEvent), - "max": h.ToInt("zk_max_latency", fullEvent), - }, - "packets": common.MapStr{ - "received": h.ToInt("zk_packets_received", fullEvent), - "sent": h.ToInt("zk_packets_sent", fullEvent), - }, - "num_alive_connections": h.ToInt("zk_num_alive_connections", fullEvent), - "outstanding_requests": h.ToInt("zk_outstanding_requests", fullEvent), - "server_state": h.ToStr("zk_server_state", fullEvent), - "znode_count": h.ToInt("zk_znode_count", fullEvent), - "watch_count": h.ToInt("zk_watch_count", fullEvent), - "ephemerals_count": h.ToInt("zk_ephemerals_count", fullEvent), - "approximate_data_size": h.ToInt("zk_approximate_data_size", fullEvent), - } + event := schema.Apply(fullEvent) // only exposed by the Leader if _, ok := fullEvent["zk_followers"]; ok { - event["followers"] = h.ToInt("zk_followers", fullEvent) - event["synced_followers"] = h.ToInt("zk_synced_followers", fullEvent) - event["pending_syncs"] = h.ToInt("zk_pending_syncs", fullEvent) + schemaLeader.ApplyTo(event, fullEvent) } // only available on Unix platforms if _, ok := fullEvent["open_file_descriptor_count"]; ok { - event["open_file_descriptor_count"] = h.ToInt("zk_open_file_descriptor_count", fullEvent) - event["max_file_descriptor_count"] = h.ToInt("zk_max_file_descriptor_count", fullEvent) + schemaUnix.ApplyTo(event, fullEvent) } return event