Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export health metrics #3432

Merged
merged 1 commit into from
Feb 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions libbeat/beat/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package beat

import (
"runtime"

"github.com/elastic/beats/libbeat/monitoring"
)

type memstatsVar struct{}

var (
metrics = monitoring.Default.NewRegistry("beat")
)

func init() {

var ms memstatsVar
metrics.Add("memstats", ms, monitoring.Reported)
}

func (memstatsVar) Visit(m monitoring.Mode, V monitoring.Visitor) {
var stats runtime.MemStats
runtime.ReadMemStats(&stats)

V.OnRegistryStart()
defer V.OnRegistryFinished()

monitoring.ReportInt(V, "memory_total", int64(stats.TotalAlloc))
if m == monitoring.Full {
monitoring.ReportInt(V, "memory_alloc", int64(stats.Alloc))
monitoring.ReportInt(V, "gc_next", int64(stats.NextGC))
}
}
29 changes: 16 additions & 13 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"encoding/json"
"errors"
"expvar"
"fmt"
"io"
"io/ioutil"
Expand All @@ -14,6 +13,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/outil"
Expand Down Expand Up @@ -70,14 +70,14 @@ type Connection struct {

// Metrics that can retrieved through the expvar web interface.
var (
ackedEvents = expvar.NewInt("libbeat.es.published_and_acked_events")
eventsNotAcked = expvar.NewInt("libbeat.es.published_but_not_acked_events")
publishEventsCallCount = expvar.NewInt("libbeat.es.call_count.PublishEvents")

statReadBytes = expvar.NewInt("libbeat.es.publish.read_bytes")
statWriteBytes = expvar.NewInt("libbeat.es.publish.write_bytes")
statReadErrors = expvar.NewInt("libbeat.es.publish.read_errors")
statWriteErrors = expvar.NewInt("libbeat.es.publish.write_errors")
ackedEvents = monitoring.NewInt(outputs.Metrics, "elasticsearch.events.acked")
eventsNotAcked = monitoring.NewInt(outputs.Metrics, "elasticsearch.events.not_acked")
publishEventsCallCount = monitoring.NewInt(outputs.Metrics, "elasticsearch.publishEvents.call.count")

statReadBytes = monitoring.NewInt(outputs.Metrics, "elasticsearch.read.bytes")
statWriteBytes = monitoring.NewInt(outputs.Metrics, "elasticsearch.write.bytes")
statReadErrors = monitoring.NewInt(outputs.Metrics, "elasticsearch.read.errors")
statWriteErrors = monitoring.NewInt(outputs.Metrics, "elasticsearch.write.errors")
)

var (
Expand Down Expand Up @@ -132,10 +132,12 @@ func NewClient(
}

iostats := &transport.IOStats{
Read: statReadBytes,
Write: statWriteBytes,
ReadErrors: statReadErrors,
WriteErrors: statWriteErrors,
Read: statReadBytes,
Write: statWriteBytes,
ReadErrors: statReadErrors,
WriteErrors: statWriteErrors,
OutputsWrite: outputs.WriteBytes,
OutputsWriteErrors: outputs.WriteErrors,
}
dialer = transport.StatsDialer(dialer, iostats)
tlsDialer = transport.StatsDialer(tlsDialer, iostats)
Expand Down Expand Up @@ -265,6 +267,7 @@ func (client *Client) PublishEvents(
}

ackedEvents.Add(int64(len(data) - len(failedEvents)))
outputs.AckedEvents.Add(int64(len(data) - len(failedEvents)))
eventsNotAcked.Add(int64(len(failedEvents)))
if len(failedEvents) > 0 {
if sendErr == nil {
Expand Down
10 changes: 6 additions & 4 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kafka

import (
"expvar"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -12,6 +11,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outil"
)
Expand All @@ -38,9 +38,9 @@ type msgRef struct {
}

var (
ackedEvents = expvar.NewInt("libbeat.kafka.published_and_acked_events")
eventsNotAcked = expvar.NewInt("libbeat.kafka.published_but_not_acked_events")
publishEventsCallCount = expvar.NewInt("libbeat.kafka.call_count.PublishEvents")
ackedEvents = monitoring.NewInt(outputs.Metrics, "kafka.events.acked")
eventsNotAcked = monitoring.NewInt(outputs.Metrics, "kafka.events.not_acked")
publishEventsCallCount = monitoring.NewInt(outputs.Metrics, "kafka.publishEvents.call.count")
)

func newKafkaClient(
Expand Down Expand Up @@ -234,12 +234,14 @@ func (r *msgRef) dec() {
eventsNotAcked.Add(int64(failed))
if success > 0 {
ackedEvents.Add(int64(success))
outputs.AckedEvents.Add(int64(success))
}

debugf("Kafka publish failed with: %v", err)
r.cb(r.failed, err)
} else {
ackedEvents.Add(int64(r.total))
outputs.AckedEvents.Add(int64(r.total))
r.cb(nil, nil)
}
}
8 changes: 4 additions & 4 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/Shopify/sarama"
metrics "github.com/rcrowley/go-metrics"
gometrics "github.com/rcrowley/go-metrics"
"github.com/rcrowley/go-metrics/exp"

"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -40,12 +40,12 @@ const (
defaultMaxWaitRetry = 60 * time.Second
)

var kafkaMetricsRegistryInstance metrics.Registry
var kafkaMetricsRegistryInstance gometrics.Registry

func init() {
sarama.Logger = kafkaLogger{}

reg := metrics.NewPrefixedRegistry("libbeat.kafka.")
reg := gometrics.NewPrefixedRegistry("libbeat.kafka.")

// Note: registers /debug/metrics handler for displaying all expvar counters
exp.Exp(reg)
Expand All @@ -56,7 +56,7 @@ func init() {

var kafkaMetricsOnce sync.Once

func kafkaMetricsRegistry() metrics.Registry {
func kafkaMetricsRegistry() gometrics.Registry {
return kafkaMetricsRegistryInstance
}

Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,15 @@ func (r *msgRef) callback(seq uint32, err error) {

func (r *msgRef) done(n uint32) {
ackedEvents.Add(int64(n))
outputs.AckedEvents.Add(int64(n))
r.batch = r.batch[n:]
r.win.tryGrowWindow(r.batchSize)
r.dec()
}

func (r *msgRef) fail(n uint32, err error) {
ackedEvents.Add(int64(n))
outputs.AckedEvents.Add(int64(n))
r.err = err
r.batch = r.batch[n:]
r.win.shrinkWindow()
Expand Down
28 changes: 15 additions & 13 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ package logstash
// registered with all output plugins

import (
"expvar"
"time"

"github.com/elastic/go-lumber/log"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/op"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/mode/modeutil"
Expand All @@ -22,14 +22,14 @@ var debug = logp.MakeDebug("logstash")

// Metrics that can retrieved through the expvar web interface.
var (
ackedEvents = expvar.NewInt("libbeat.logstash.published_and_acked_events")
eventsNotAcked = expvar.NewInt("libbeat.logstash.published_but_not_acked_events")
publishEventsCallCount = expvar.NewInt("libbeat.logstash.call_count.PublishEvents")

statReadBytes = expvar.NewInt("libbeat.logstash.publish.read_bytes")
statWriteBytes = expvar.NewInt("libbeat.logstash.publish.write_bytes")
statReadErrors = expvar.NewInt("libbeat.logstash.publish.read_errors")
statWriteErrors = expvar.NewInt("libbeat.logstash.publish.write_errors")
ackedEvents = monitoring.NewInt(outputs.Metrics, "logstash.events.acked")
eventsNotAcked = monitoring.NewInt(outputs.Metrics, "logstash.events.not_acked")
publishEventsCallCount = monitoring.NewInt(outputs.Metrics, "logstash.publishEvents.call.count")

statReadBytes = monitoring.NewInt(outputs.Metrics, "logstash.read.bytes")
statWriteBytes = monitoring.NewInt(outputs.Metrics, "logstash.write.bytes")
statReadErrors = monitoring.NewInt(outputs.Metrics, "logstash.read.errors")
statWriteErrors = monitoring.NewInt(outputs.Metrics, "logstash.write.errors")
)

const (
Expand Down Expand Up @@ -80,10 +80,12 @@ func (lj *logstash) init(cfg *common.Config) error {
Proxy: &config.Proxy,
TLS: tls,
Stats: &transport.IOStats{
Read: statReadBytes,
Write: statWriteBytes,
ReadErrors: statReadErrors,
WriteErrors: statWriteErrors,
Read: statReadBytes,
Write: statWriteBytes,
ReadErrors: statReadErrors,
WriteErrors: statWriteErrors,
OutputsWrite: outputs.WriteBytes,
OutputsWriteErrors: outputs.WriteErrors,
},
}

Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/logstash/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ func (c *client) PublishEvents(

eventsNotAcked.Add(int64(len(data)))
ackedEvents.Add(int64(totalNumberOfEvents - len(data)))
outputs.AckedEvents.Add(int64(totalNumberOfEvents - len(data)))
return data, err
}
}
ackedEvents.Add(int64(totalNumberOfEvents))
outputs.AckedEvents.Add(int64(totalNumberOfEvents))
return nil, nil
}

Expand Down
4 changes: 2 additions & 2 deletions libbeat/outputs/mode/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ package mode

import (
"errors"
"expvar"
"time"

"github.com/elastic/beats/libbeat/common/op"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
)

// Metrics that can retrieved through the expvar web interface.
var (
messagesDropped = expvar.NewInt("libbeat.outputs.messages_dropped")
messagesDropped = monitoring.NewInt(outputs.Metrics, "messages.dropped")
)

// ErrNoHostsConfigured indicates missing host or hosts configuration
Expand Down
9 changes: 9 additions & 0 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/op"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
)

type Options struct {
Expand Down Expand Up @@ -72,6 +73,14 @@ type bulkOutputAdapter struct {

var outputsPlugins = make(map[string]OutputBuilder)

var (
Metrics = monitoring.Default.NewRegistry("output")

AckedEvents = monitoring.NewInt(Metrics, "events.acked", monitoring.Report)
WriteBytes = monitoring.NewInt(Metrics, "write.bytes", monitoring.Report)
WriteErrors = monitoring.NewInt(Metrics, "write.errors", monitoring.Report)
)

func RegisterOutputPlugin(name string, builder OutputBuilder) {
outputsPlugins[name] = builder
}
Expand Down
12 changes: 12 additions & 0 deletions libbeat/outputs/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
Expand All @@ -19,6 +20,11 @@ var (
versionRegex = regexp.MustCompile(`redis_version:(\d+).(\d+)`)
)

var (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redis.published.events.acked
redis.published.events.not_acked

ackedEvents = monitoring.NewInt(outputs.Metrics, "redis.events.acked")
eventsNotAcked = monitoring.NewInt(outputs.Metrics, "redis.events.not_acked")
)

type publishFn func(
keys outil.Selector,
data []outputs.Data,
Expand Down Expand Up @@ -186,7 +192,10 @@ func publishEventsBulk(conn redis.Conn, key outil.Selector, command string, code
if err != nil {
logp.Err("Failed to %v to redis list with %v", command, err)
return data, err

}
ackedEvents.Add(int64(len(data)))
outputs.AckedEvents.Add(int64(len(data)))

return nil, nil
}
Expand Down Expand Up @@ -239,6 +248,9 @@ func publishEventsPipeline(conn redis.Conn, command string, codec outputs.Codec)
}
}
}
ackedEvents.Add(int64(len(okEvents) - len(failed)))
outputs.AckedEvents.Add(int64(len(okEvents) - len(failed)))
eventsNotAcked.Add(int64(len(failed)))
return failed, lastErr
}
}
Expand Down
20 changes: 11 additions & 9 deletions libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package redis

import (
"errors"
"expvar"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/op"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/mode/modeutil"
Expand All @@ -25,10 +25,10 @@ var debugf = logp.MakeDebug("redis")

// Metrics that can retrieved through the expvar web interface.
var (
statReadBytes = expvar.NewInt("libbeat.redis.publish.read_bytes")
statWriteBytes = expvar.NewInt("libbeat.redis.publish.write_bytes")
statReadErrors = expvar.NewInt("libbeat.redis.publish.read_errors")
statWriteErrors = expvar.NewInt("libbeat.redis.publish.write_errors")
statReadBytes = monitoring.NewInt(outputs.Metrics, "redis.read.bytes")
statWriteBytes = monitoring.NewInt(outputs.Metrics, "redis.write.bytes")
statReadErrors = monitoring.NewInt(outputs.Metrics, "redis.read.errors")
statWriteErrors = monitoring.NewInt(outputs.Metrics, "redis.write.errors")
)

const (
Expand Down Expand Up @@ -103,10 +103,12 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
Proxy: &config.Proxy,
TLS: tls,
Stats: &transport.IOStats{
Read: statReadBytes,
Write: statWriteBytes,
ReadErrors: statReadErrors,
WriteErrors: statWriteErrors,
Read: statReadBytes,
Write: statWriteBytes,
ReadErrors: statReadErrors,
WriteErrors: statWriteErrors,
OutputsWrite: outputs.WriteBytes,
OutputsWriteErrors: outputs.WriteErrors,
},
}

Expand Down
Loading