Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] move telemetry and expiring registry to separate packages #291

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion event.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
package main

import (
"github.com/prometheus/statsd_exporter/pkg/telemetry"
"sync"
"time"

@@ -114,7 +115,7 @@ func (eq *eventQueue) flush() {
func (eq *eventQueue) flushUnlocked() {
eq.c <- eq.q
eq.q = make([]Event, 0, cap(eq.q))
eventsFlushed.Inc()
telemetry.EventsFlushed.Inc()
}

func (eq *eventQueue) len() int {
87 changes: 44 additions & 43 deletions exporter.go
Original file line number Diff line number Diff line change
@@ -27,9 +27,10 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/statsd_exporter/pkg/clock"
"github.com/prometheus/statsd_exporter/pkg/expiringregistry"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"github.com/prometheus/statsd_exporter/pkg/telemetry"
)

const (
@@ -50,7 +51,7 @@ func (u uncheckedCollector) Collect(c chan<- prometheus.Metric) {

type Exporter struct {
mapper *mapper.MetricMapper
registry *registry
registry *expiringregistry.Registry
logger log.Logger
}

@@ -62,7 +63,7 @@ func (b *Exporter) Listen(e <-chan Events) {
for {
select {
case <-removeStaleMetricsTicker.C:
b.registry.removeStaleMetrics()
b.registry.RemoveStaleMetrics()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be part of a maintenance routine internal to the expiring registry? That way we would not have to rely on the library users to do it regularly.

case events, ok := <-e:
if !ok {
level.Debug(b.logger).Log("msg", "Channel is closed. Break out of Exporter.Listener.")
@@ -87,7 +88,7 @@ func (b *Exporter) handleEvent(event Event) {
}

if mapping.Action == mapper.ActionTypeDrop {
eventsActions.WithLabelValues("drop").Inc()
telemetry.EventsActions.WithLabelValues("drop").Inc()
return
}

@@ -101,16 +102,16 @@ func (b *Exporter) handleEvent(event Event) {
if present {
if mapping.Name == "" {
level.Debug(b.logger).Log("msg", "The mapping generates an empty metric name", "metric_name", event.MetricName(), "match", mapping.Match)
errorEventStats.WithLabelValues("empty_metric_name").Inc()
telemetry.ErrorEventStats.WithLabelValues("empty_metric_name").Inc()
return
}
metricName = mapper.EscapeMetricName(mapping.Name)
for label, value := range labels {
prometheusLabels[label] = value
}
eventsActions.WithLabelValues(string(mapping.Action)).Inc()
telemetry.EventsActions.WithLabelValues(string(mapping.Action)).Inc()
} else {
eventsUnmapped.Inc()
telemetry.EventsUnmapped.Inc()
metricName = mapper.EscapeMetricName(event.MetricName())
}

@@ -120,32 +121,32 @@ func (b *Exporter) handleEvent(event Event) {
// will cause the exporter to panic. Instead we will warn and continue to the next event.
if event.Value() < 0.0 {
level.Debug(b.logger).Log("msg", "counter must be non-negative value", "metric", metricName, "event_value", event.Value())
errorEventStats.WithLabelValues("illegal_negative_counter").Inc()
telemetry.ErrorEventStats.WithLabelValues("illegal_negative_counter").Inc()
return
}

counter, err := b.registry.getCounter(metricName, prometheusLabels, help, mapping)
counter, err := b.registry.GetCounter(metricName, prometheusLabels, help, mapping.Ttl)
if err == nil {
counter.Add(event.Value())
eventStats.WithLabelValues("counter").Inc()
telemetry.EventStats.WithLabelValues("counter").Inc()
} else {
level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("counter").Inc()
telemetry.ConflictingEventStats.WithLabelValues("counter").Inc()
}

case *GaugeEvent:
gauge, err := b.registry.getGauge(metricName, prometheusLabels, help, mapping)
gauge, err := b.registry.GetGauge(metricName, prometheusLabels, help, mapping.Ttl)

if err == nil {
if ev.relative {
gauge.Add(event.Value())
} else {
gauge.Set(event.Value())
}
eventStats.WithLabelValues("gauge").Inc()
telemetry.EventStats.WithLabelValues("gauge").Inc()
} else {
level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("gauge").Inc()
telemetry.ConflictingEventStats.WithLabelValues("gauge").Inc()
}

case *TimerEvent:
@@ -159,23 +160,23 @@ func (b *Exporter) handleEvent(event Event) {

switch t {
case mapper.TimerTypeHistogram:
histogram, err := b.registry.getHistogram(metricName, prometheusLabels, help, mapping)
histogram, err := b.registry.GetHistogram(metricName, prometheusLabels, help, mapping.Buckets, mapping.Ttl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conflict warning: this will change slightly with #290 since we are about to move the Buckets and Quantiles into a HistogramOptions/SummaryOptions sub-struct. It will be easy to adapt though.

if err == nil {
histogram.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
eventStats.WithLabelValues("timer").Inc()
telemetry.EventStats.WithLabelValues("timer").Inc()
} else {
level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("timer").Inc()
telemetry.ConflictingEventStats.WithLabelValues("timer").Inc()
}

case mapper.TimerTypeDefault, mapper.TimerTypeSummary:
summary, err := b.registry.getSummary(metricName, prometheusLabels, help, mapping)
summary, err := b.registry.GetSummary(metricName, prometheusLabels, help, mapping.Quantiles, mapping.Ttl)
if err == nil {
summary.Observe(event.Value() / 1000) // prometheus presumes seconds, statsd millisecond
eventStats.WithLabelValues("timer").Inc()
telemetry.EventStats.WithLabelValues("timer").Inc()
} else {
level.Debug(b.logger).Log("msg", regErrF, "metric", metricName, "error", err)
conflictingEventStats.WithLabelValues("timer").Inc()
telemetry.ConflictingEventStats.WithLabelValues("timer").Inc()
}

default:
@@ -185,14 +186,14 @@ func (b *Exporter) handleEvent(event Event) {

default:
level.Debug(b.logger).Log("msg", "Unsupported event type")
eventStats.WithLabelValues("illegal").Inc()
telemetry.EventStats.WithLabelValues("illegal").Inc()
}
}

func NewExporter(mapper *mapper.MetricMapper, logger log.Logger) *Exporter {
return &Exporter{
mapper: mapper,
registry: newRegistry(mapper),
registry: expiringregistry.NewRegistry(&mapper.Defaults, telemetry.MetricsCount),
logger: logger,
}
}
@@ -228,7 +229,7 @@ func buildEvent(statType, metric string, value float64, relative bool, labels ma
func parseTag(component, tag string, separator rune, labels map[string]string, logger log.Logger) {
// Entirely empty tag is an error
if len(tag) == 0 {
tagErrors.Inc()
telemetry.TagErrors.Inc()
level.Debug(logger).Log("msg", "Empty name tag", "component", component)
return
}
@@ -240,7 +241,7 @@ func parseTag(component, tag string, separator rune, labels map[string]string, l

if len(k) == 0 || len(v) == 0 {
// Empty key or value is an error
tagErrors.Inc()
telemetry.TagErrors.Inc()
level.Debug(logger).Log("msg", "Malformed name tag", "k", k, "v", v, "component", component)
} else {
labels[mapper.EscapeMetricName(k)] = v
@@ -250,7 +251,7 @@ func parseTag(component, tag string, separator rune, labels map[string]string, l
}

// Missing separator (no value) is an error
tagErrors.Inc()
telemetry.TagErrors.Inc()
level.Debug(logger).Log("msg", "Malformed name tag", "tag", tag, "component", component)
}

@@ -317,7 +318,7 @@ func lineToEvents(line string, logger log.Logger) Events {

elements := strings.SplitN(line, ":", 2)
if len(elements) < 2 || len(elements[0]) == 0 || !utf8.ValidString(line) {
sampleErrors.WithLabelValues("malformed_line").Inc()
telemetry.SampleErrors.WithLabelValues("malformed_line").Inc()
level.Debug(logger).Log("msg", "Bad line from StatsD", "line", line)
return events
}
@@ -331,7 +332,7 @@ func lineToEvents(line string, logger log.Logger) Events {

// don't allow mixed tagging styles
if len(labels) > 0 {
sampleErrors.WithLabelValues("mixed_tagging_styles").Inc()
telemetry.SampleErrors.WithLabelValues("mixed_tagging_styles").Inc()
level.Debug(logger).Log("msg", "Bad line (multiple tagging styles) from StatsD", "line", line)
return events
}
@@ -343,11 +344,11 @@ func lineToEvents(line string, logger log.Logger) Events {
}
samples:
for _, sample := range samples {
samplesReceived.Inc()
telemetry.SamplesReceived.Inc()
components := strings.Split(sample, "|")
samplingFactor := 1.0
if len(components) < 2 || len(components) > 4 {
sampleErrors.WithLabelValues("malformed_component").Inc()
telemetry.SampleErrors.WithLabelValues("malformed_component").Inc()
level.Debug(logger).Log("msg", "Bad component", "line", line)
continue
}
@@ -361,7 +362,7 @@ samples:
value, err := strconv.ParseFloat(valueStr, 64)
if err != nil {
level.Debug(logger).Log("msg", "Bad value", "value", valueStr, "line", line)
sampleErrors.WithLabelValues("malformed_value").Inc()
telemetry.SampleErrors.WithLabelValues("malformed_value").Inc()
continue
}

@@ -370,7 +371,7 @@ samples:
for _, component := range components[2:] {
if len(component) == 0 {
level.Debug(logger).Log("msg", "Empty component", "line", line)
sampleErrors.WithLabelValues("malformed_component").Inc()
telemetry.SampleErrors.WithLabelValues("malformed_component").Inc()
continue samples
}
}
@@ -382,7 +383,7 @@ samples:
samplingFactor, err = strconv.ParseFloat(component[1:], 64)
if err != nil {
level.Debug(logger).Log("msg", "Invalid sampling factor", "component", component[1:], "line", line)
sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
telemetry.SampleErrors.WithLabelValues("invalid_sample_factor").Inc()
}
if samplingFactor == 0 {
samplingFactor = 1
@@ -399,21 +400,21 @@ samples:
parseDogStatsDTags(component[1:], labels, logger)
default:
level.Debug(logger).Log("msg", "Invalid sampling factor or tag section", "component", components[2], "line", line)
sampleErrors.WithLabelValues("invalid_sample_factor").Inc()
telemetry.SampleErrors.WithLabelValues("invalid_sample_factor").Inc()
continue
}
}
}

if len(labels) > 0 {
tagsReceived.Inc()
telemetry.TagsReceived.Inc()
}

for i := 0; i < multiplyEvents; i++ {
event, err := buildEvent(statType, metric, value, relative, labels)
if err != nil {
level.Debug(logger).Log("msg", "Error building event", "line", line, "error", err)
sampleErrors.WithLabelValues("illegal_event").Inc()
telemetry.SampleErrors.WithLabelValues("illegal_event").Inc()
continue
}
events = append(events, event)
@@ -450,10 +451,10 @@ func (l *StatsDUDPListener) Listen() {
}

func (l *StatsDUDPListener) handlePacket(packet []byte) {
udpPackets.Inc()
telemetry.UdpPackets.Inc()
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
linesReceived.Inc()
telemetry.LinesReceived.Inc()
l.eventHandler.queue(lineToEvents(line, l.logger))
}
}
@@ -487,24 +488,24 @@ func (l *StatsDTCPListener) Listen() {
func (l *StatsDTCPListener) handleConn(c *net.TCPConn) {
defer c.Close()

tcpConnections.Inc()
telemetry.TcpConnections.Inc()

r := bufio.NewReader(c)
for {
line, isPrefix, err := r.ReadLine()
if err != nil {
if err != io.EOF {
tcpErrors.Inc()
telemetry.TcpErrors.Inc()
level.Debug(l.logger).Log("msg", "Read failed", "addr", c.RemoteAddr(), "error", err)
}
break
}
if isPrefix {
tcpLineTooLong.Inc()
telemetry.TcpLineTooLong.Inc()
level.Debug(l.logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr())
break
}
linesReceived.Inc()
telemetry.LinesReceived.Inc()
l.eventHandler.queue(lineToEvents(string(line), l.logger))
}
}
@@ -537,10 +538,10 @@ func (l *StatsDUnixgramListener) Listen() {
}

func (l *StatsDUnixgramListener) handlePacket(packet []byte) {
unixgramPackets.Inc()
telemetry.UnixgramPackets.Inc()
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
linesReceived.Inc()
telemetry.LinesReceived.Inc()
l.eventHandler.queue(lineToEvents(string(line), l.logger))
}
}
Loading